From e4e3395f3e8edbe236ad49b72eafbb1dffdaefda Mon Sep 17 00:00:00 2001 From: prazanna Date: Tue, 10 Jan 2017 22:14:40 -0800 Subject: [PATCH] Make commit a public method. Introduce a auto-commit config. Relates issue https://github.com/uber/hoodie/issues/58 (#60) --- .../com/uber/hoodie/HoodieWriteClient.java | 26 +++++++----- .../uber/hoodie/config/HoodieWriteConfig.java | 15 +++++++ .../com/uber/hoodie/TestHoodieClient.java | 40 ++++++++++++++++--- 3 files changed, 67 insertions(+), 14 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index f0a6df3d6..286174c2c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -206,10 +206,7 @@ public class HoodieWriteClient implements Seriali // Update the index back. JavaRDD resultRDD = index.updateLocation(upsertStatusRDD, metadata); resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel()); - boolean commitResult = commit(commitTime, resultRDD); - if (!commitResult) { - throw new HoodieCommitException("Failed to commit " + commitTime); - } + commitOnAutoCommit(commitTime, resultRDD); return resultRDD; } catch (Throwable e) { if (e instanceof HoodieUpsertException) { @@ -219,6 +216,18 @@ public class HoodieWriteClient implements Seriali } } + private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD) { + if(config.shouldAutoCommit()) { + logger.info("Auto commit enabled: Committing " + commitTime); + boolean commitResult = commit(commitTime, resultRDD); + if (!commitResult) { + throw new HoodieCommitException("Failed to commit " + commitTime); + } + } else { + logger.info("Auto commit disabled for " + commitTime); + } + } + private JavaRDD> combineOnCondition(boolean condition, JavaRDD> records, int parallelism) { if(condition) { @@ -271,10 +280,7 @@ public class HoodieWriteClient implements Seriali JavaRDD statuses = index.updateLocation(writeStatusRDD, metadata); // Trigger the insert and collect statuses statuses = statuses.persist(config.getWriteStatusStorageLevel()); - boolean commitResult = commit(commitTime, statuses); - if (!commitResult) { - throw new HoodieCommitException("Failed to commit " + commitTime); - } + commitOnAutoCommit(commitTime, statuses); return statuses; } catch (Throwable e) { if (e instanceof HoodieInsertException) { @@ -287,7 +293,8 @@ public class HoodieWriteClient implements Seriali /** * Commit changes performed at the given commitTime marker */ - private boolean commit(String commitTime, JavaRDD writeStatuses) { + public boolean commit(String commitTime, JavaRDD writeStatuses) { + logger.info("Comitting " + commitTime); Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime)); try { @@ -331,6 +338,7 @@ public class HoodieWriteClient implements Seriali writeContext = null; } } + logger.info("Status of the commit " + commitTime + ": " + success); return success; } catch (IOException e) { throw new HoodieCommitException( diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index c050294fe..c439f6d7e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -46,6 +46,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true"; private static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level"; private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; + private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit"; + private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true"; + private HoodieWriteConfig(Properties props) { super(props); @@ -66,6 +69,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(TABLE_NAME); } + public Boolean shouldAutoCommit() { + return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP)); + } + public int getInsertShuffleParallelism() { return Integer.parseInt(props.getProperty(INSERT_PARALLELISM)); } @@ -211,6 +218,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private boolean isStorageConfigSet = false; private boolean isCompactionConfigSet = false; private boolean isMetricsConfigSet = false; + private boolean isAutoCommit = true; public Builder fromFile(File propertiesFile) throws IOException { FileReader reader = new FileReader(propertiesFile); @@ -279,6 +287,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withAutoCommit(boolean autoCommit) { + props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit)); + return this; + } + public HoodieWriteConfig build() { HoodieWriteConfig config = new HoodieWriteConfig(props); // Check for mandatory properties @@ -293,6 +306,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { COMBINE_BEFORE_UPSERT_PROP, DEFAULT_COMBINE_BEFORE_UPSERT); setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL); + setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), + HOODIE_AUTO_COMMIT_PROP, DEFAULT_HOODIE_AUTO_COMMIT); setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index 7b84abe72..6bcc4c56b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -148,6 +148,35 @@ public class TestHoodieClient implements Serializable { assertTrue(result.size() == 25); } + @Test + public void testAutoCommit() throws Exception { + // Set autoCommit false + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + + String newCommitTime = "001"; + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + JavaRDD result = client.insert(writeRecords, newCommitTime); + + assertFalse("If Autocommit is false, then commit should not be made automatically", + HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); + assertTrue("Commit should succeed", client.commit(newCommitTime, result)); + assertTrue("After explicit commit, commit file should be created", + HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); + + newCommitTime = "002"; + records = dataGen.generateUpdates(newCommitTime, 100); + JavaRDD updateRecords = jsc.parallelize(records, 1); + result = client.upsert(writeRecords, newCommitTime); + assertFalse("If Autocommit is false, then commit should not be made automatically", + HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); + assertTrue("Commit should succeed", client.commit(newCommitTime, result)); + assertTrue("After explicit commit, commit file should be created", + HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); + } + @Test public void testUpserts() throws Exception { HoodieWriteConfig cfg = getConfig(); @@ -210,13 +239,12 @@ public class TestHoodieClient implements Serializable { // Check that the incremental consumption from time 000 assertEquals("Incremental consumption from time 002, should give all records in commit 004", - readClient.readCommit(newCommitTime).count(), - readClient.readSince("002").count()); + readClient.readCommit(newCommitTime).count(), + readClient.readSince("002").count()); assertEquals("Incremental consumption from time 001, should give all records in commit 004", - readClient.readCommit(newCommitTime).count(), - readClient.readSince("001").count()); + readClient.readCommit(newCommitTime).count(), + readClient.readSince("001").count()); } - @Test public void testInsertAndCleanByVersions() throws Exception { int maxVersions = 2; // keep upto 2 versions for each file @@ -513,6 +541,8 @@ public class TestHoodieClient implements Serializable { HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13)); } + + @Test public void testSmallInsertHandling() throws Exception {