1
0

Make commit a public method. Introduce a auto-commit config. Relates issue https://github.com/uber/hoodie/issues/58 (#60)

This commit is contained in:
prazanna
2017-01-10 22:14:40 -08:00
committed by vinoth chandar
parent 9d09a58a18
commit e4e3395f3e
3 changed files with 67 additions and 14 deletions

View File

@@ -206,10 +206,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// Update the index back. // Update the index back.
JavaRDD<WriteStatus> resultRDD = index.updateLocation(upsertStatusRDD, metadata); JavaRDD<WriteStatus> resultRDD = index.updateLocation(upsertStatusRDD, metadata);
resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel()); resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel());
boolean commitResult = commit(commitTime, resultRDD); commitOnAutoCommit(commitTime, resultRDD);
if (!commitResult) {
throw new HoodieCommitException("Failed to commit " + commitTime);
}
return resultRDD; return resultRDD;
} catch (Throwable e) { } catch (Throwable e) {
if (e instanceof HoodieUpsertException) { if (e instanceof HoodieUpsertException) {
@@ -219,6 +216,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
} }
} }
private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD) {
if(config.shouldAutoCommit()) {
logger.info("Auto commit enabled: Committing " + commitTime);
boolean commitResult = commit(commitTime, resultRDD);
if (!commitResult) {
throw new HoodieCommitException("Failed to commit " + commitTime);
}
} else {
logger.info("Auto commit disabled for " + commitTime);
}
}
private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition,
JavaRDD<HoodieRecord<T>> records, int parallelism) { JavaRDD<HoodieRecord<T>> records, int parallelism) {
if(condition) { if(condition) {
@@ -271,10 +280,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, metadata); JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, metadata);
// Trigger the insert and collect statuses // Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel()); statuses = statuses.persist(config.getWriteStatusStorageLevel());
boolean commitResult = commit(commitTime, statuses); commitOnAutoCommit(commitTime, statuses);
if (!commitResult) {
throw new HoodieCommitException("Failed to commit " + commitTime);
}
return statuses; return statuses;
} catch (Throwable e) { } catch (Throwable e) {
if (e instanceof HoodieInsertException) { if (e instanceof HoodieInsertException) {
@@ -287,7 +293,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
/** /**
* Commit changes performed at the given commitTime marker * Commit changes performed at the given commitTime marker
*/ */
private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) { public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) {
logger.info("Comitting " + commitTime);
Path commitFile = Path commitFile =
new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime)); new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime));
try { try {
@@ -331,6 +338,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
writeContext = null; writeContext = null;
} }
} }
logger.info("Status of the commit " + commitTime + ": " + success);
return success; return success;
} catch (IOException e) { } catch (IOException e) {
throw new HoodieCommitException( throw new HoodieCommitException(

View File

@@ -46,6 +46,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true"; private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
private static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level"; private static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level";
private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
private HoodieWriteConfig(Properties props) { private HoodieWriteConfig(Properties props) {
super(props); super(props);
@@ -66,6 +69,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(TABLE_NAME); return props.getProperty(TABLE_NAME);
} }
public Boolean shouldAutoCommit() {
return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP));
}
public int getInsertShuffleParallelism() { public int getInsertShuffleParallelism() {
return Integer.parseInt(props.getProperty(INSERT_PARALLELISM)); return Integer.parseInt(props.getProperty(INSERT_PARALLELISM));
} }
@@ -211,6 +218,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isStorageConfigSet = false; private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false; private boolean isCompactionConfigSet = false;
private boolean isMetricsConfigSet = false; private boolean isMetricsConfigSet = false;
private boolean isAutoCommit = true;
public Builder fromFile(File propertiesFile) throws IOException { public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile); FileReader reader = new FileReader(propertiesFile);
@@ -279,6 +287,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder withAutoCommit(boolean autoCommit) {
props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this;
}
public HoodieWriteConfig build() { public HoodieWriteConfig build() {
HoodieWriteConfig config = new HoodieWriteConfig(props); HoodieWriteConfig config = new HoodieWriteConfig(props);
// Check for mandatory properties // Check for mandatory properties
@@ -293,6 +306,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
COMBINE_BEFORE_UPSERT_PROP, DEFAULT_COMBINE_BEFORE_UPSERT); COMBINE_BEFORE_UPSERT_PROP, DEFAULT_COMBINE_BEFORE_UPSERT);
setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL),
WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL); WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP),
HOODIE_AUTO_COMMIT_PROP, DEFAULT_HOODIE_AUTO_COMMIT);
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build()); setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build());

View File

@@ -148,6 +148,35 @@ public class TestHoodieClient implements Serializable {
assertTrue(result.size() == 25); assertTrue(result.size() == 25);
} }
@Test
public void testAutoCommit() throws Exception {
// Set autoCommit false
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
String newCommitTime = "001";
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
assertFalse("If Autocommit is false, then commit should not be made automatically",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
assertTrue("Commit should succeed", client.commit(newCommitTime, result));
assertTrue("After explicit commit, commit file should be created",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
newCommitTime = "002";
records = dataGen.generateUpdates(newCommitTime, 100);
JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(records, 1);
result = client.upsert(writeRecords, newCommitTime);
assertFalse("If Autocommit is false, then commit should not be made automatically",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
assertTrue("Commit should succeed", client.commit(newCommitTime, result));
assertTrue("After explicit commit, commit file should be created",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
}
@Test @Test
public void testUpserts() throws Exception { public void testUpserts() throws Exception {
HoodieWriteConfig cfg = getConfig(); HoodieWriteConfig cfg = getConfig();
@@ -210,13 +239,12 @@ public class TestHoodieClient implements Serializable {
// Check that the incremental consumption from time 000 // Check that the incremental consumption from time 000
assertEquals("Incremental consumption from time 002, should give all records in commit 004", assertEquals("Incremental consumption from time 002, should give all records in commit 004",
readClient.readCommit(newCommitTime).count(), readClient.readCommit(newCommitTime).count(),
readClient.readSince("002").count()); readClient.readSince("002").count());
assertEquals("Incremental consumption from time 001, should give all records in commit 004", assertEquals("Incremental consumption from time 001, should give all records in commit 004",
readClient.readCommit(newCommitTime).count(), readClient.readCommit(newCommitTime).count(),
readClient.readSince("001").count()); readClient.readSince("001").count());
} }
@Test @Test
public void testInsertAndCleanByVersions() throws Exception { public void testInsertAndCleanByVersions() throws Exception {
int maxVersions = 2; // keep upto 2 versions for each file int maxVersions = 2; // keep upto 2 versions for each file
@@ -513,6 +541,8 @@ public class TestHoodieClient implements Serializable {
HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13)); HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
} }
@Test @Test
public void testSmallInsertHandling() throws Exception { public void testSmallInsertHandling() throws Exception {