diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index d98da346e..c2089466f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -852,18 +852,13 @@ public abstract class AbstractHoodieWriteClient getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option curInstantTime) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java index 80191d4c3..a20469429 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java @@ -21,7 +21,6 @@ package org.apache.hudi.client.heartbeat; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -30,9 +29,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; /** * Helper class to delete heartbeat for completed or failed instants with expired heartbeats. @@ -55,6 +51,8 @@ public class HeartbeatUtils { deleted = fs.delete(new Path(heartbeatFolderPath + Path.SEPARATOR + instantTime), false); if (!deleted) { LOG.error("Failed to delete heartbeat for instant " + instantTime); + } else { + LOG.info("Deleted the heartbeat for instant " + instantTime); } } catch (IOException io) { LOG.error("Unable to delete heartbeat for instant " + instantTime, io); @@ -63,20 +61,19 @@ public class HeartbeatUtils { } /** - * Deletes the heartbeat files for instants with expired heartbeats without any active instant. - * @param allExistingHeartbeatInstants - * @param metaClient - * @param basePath + * Deletes the heartbeat file for the specified instant. + * @param fs Hadoop FileSystem instance + * @param basePath Hoodie table base path + * @param instantTime Commit instant time + * @param config HoodieWriteConfig instance + * @return Boolean indicating whether heartbeat file was deleted or not */ - public static void cleanExpiredHeartbeats(List allExistingHeartbeatInstants, - HoodieTableMetaClient metaClient, String basePath) { - Set nonExpiredHeartbeatInstants = metaClient.getActiveTimeline() - .filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); - allExistingHeartbeatInstants.stream().forEach(instant -> { - if (!nonExpiredHeartbeatInstants.contains(instant)) { - deleteHeartbeatFile(metaClient.getFs(), basePath, instant); - } - }); + public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, String instantTime, HoodieWriteConfig config) { + if (config.getFailedWritesCleanPolicy().isLazy()) { + return deleteHeartbeatFile(fs, basePath, instantTime); + } + + return false; } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index c70a2cf6a..cef6641f1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,7 +18,9 @@ package org.apache.hudi.client; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; @@ -136,6 +138,62 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception { + if (tableType == HoodieTableType.MERGE_ON_READ) { + setUpMORTestTable(); + } + + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + + HoodieWriteConfig cfg = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withMaxNumDeltaCommitsBeforeCompaction(2) + .build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProviderTestClass.class) + .build()) + .withAutoCommit(false) + .withProperties(properties) + .build(); + + // Create the first commit + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + createCommitWithInsertsForPartition(cfg, client, "000", "001", 100, "2016/03/01"); + + int numConcurrentWriters = 5; + ExecutorService executors = Executors.newFixedThreadPool(numConcurrentWriters); + + List> futures = new ArrayList<>(numConcurrentWriters); + for (int loop = 0; loop < numConcurrentWriters; loop++) { + String newCommitTime = "00" + (loop + 2); + String partition = "2016/03/0" + (loop + 2); + futures.add(executors.submit(() -> { + try { + SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg); + createCommitWithInsertsForPartition(cfg, writeClient, "001", newCommitTime, 100, partition); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + futures.forEach(f -> { + try { + f.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception { // create inserts X 1 if (tableType == HoodieTableType.MERGE_ON_READ) { @@ -294,6 +352,14 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { } } + private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDDWriteClient client, + String prevCommitTime, String newCommitTime, int numRecords, + String partition) throws Exception { + JavaRDD result = insertBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::insert, + false, false, numRecords, numRecords, 1, Option.of(partition)); + assertTrue(client.commit(newCommitTime, result), "Commit should succeed"); + } + private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommitTime, String newCommitTime, int numRecords) throws Exception { // Finish first base commmit diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 9bca10892..dda396a13 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -303,7 +303,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { // Insert with original schema is allowed now insertBatch(hoodieWriteConfig, client, "009", "008", numRecords, SparkRDDWriteClient::insert, - false, false, 0, 0, 0); + false, false, 0, 0, 0, Option.empty()); checkLatestDeltaCommit("009"); checkReadRecords("000", 3 * numRecords); } @@ -438,7 +438,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { // Insert with original schema is allowed now insertBatch(hoodieWriteConfig, client, "007", "003", numRecords, SparkRDDWriteClient::insert, - false, true, numRecords, 2 * numRecords, 1); + false, true, numRecords, 2 * numRecords, 1, Option.empty()); checkReadRecords("000", 2 * numRecords); // Update with original schema is allowed now diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 0a010dde5..05d4b5c7d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -244,6 +244,18 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { }; } + private Function3, String, Integer, String> wrapRecordsGenFunctionForPreppedCalls( + final HoodieWriteConfig writeConfig, final Function3, String, Integer, String> recordGenFunction) { + return (commit, numRecords, partition) -> { + final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig); + List records = recordGenFunction.apply(commit, numRecords, partition); + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + JavaRDD taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table); + return taggedRecords.collect(); + }; + } + /** * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is @@ -285,6 +297,15 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { } } + public Function3, String, Integer, String> generateWrapRecordsForPartitionFn(boolean isPreppedAPI, + HoodieWriteConfig writeConfig, Function3, String, Integer, String> wrapped) { + if (isPreppedAPI) { + return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); + } else { + return wrapped; + } + } + /** * Generate wrapper for delete key generation function for testing Prepped APIs. * @@ -355,12 +376,22 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { public JavaRDD insertBatch(HoodieWriteConfig writeConfig, SparkRDDWriteClient client, String newCommitTime, String initCommitTime, int numRecordsInThisCommit, Function3, SparkRDDWriteClient, JavaRDD, String> writeFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { - final Function2, String, Integer> recordGenFunction = - generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, Option partition) throws Exception { - return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, - recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false); + if (partition.isPresent()) { + final Function3, String, Integer, String> recordGenFunction = + generateWrapRecordsForPartitionFn(isPreppedAPI, writeConfig, dataGen::generateInsertsForPartition); + + return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, + recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false, + partition.get()); + } else { + final Function2, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); + + return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, + recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false); + } } public JavaRDD updateBatch(HoodieWriteConfig writeConfig, SparkRDDWriteClient client, String newCommitTime, @@ -453,6 +484,16 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, doCommit, true); } + public JavaRDD writeBatch(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime, + Option> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, + Function3, String, Integer, String> recordGenFunction, + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, + boolean doCommit, String partition) throws Exception { + return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, numRecordsInThisCommit, recordGenFunction, + writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, doCommit, true, partition); + } + /** * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion. * @@ -478,10 +519,35 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit, boolean filterForCommitTimeWithAssert) throws Exception { + List records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit); + return writeBatchHelper(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, + numRecordsInThisCommit, records, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, + expTotalCommits, doCommit, filterForCommitTimeWithAssert); + } + + public JavaRDD writeBatch(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime, + Option> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, + Function3, String, Integer, String> recordGenFunction, + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit, + boolean filterForCommitTimeWithAssert, + String partition) throws Exception { + + List records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit, partition); + return writeBatchHelper(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, + numRecordsInThisCommit, records, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, + expTotalCommits, doCommit, filterForCommitTimeWithAssert); + } + + private JavaRDD writeBatchHelper(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime, + Option> commitTimesBetweenPrevAndNew, String initCommitTime, + int numRecordsInThisCommit, List records, + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, + int expTotalCommits, boolean doCommit, boolean filterForCommitTimeWithAssert) throws IOException { // Write 1 (only inserts) client.startCommitWithTime(newCommitTime); - List records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit); JavaRDD writeRecords = jsc.parallelize(records, 1); JavaRDD result = writeFn.apply(client, writeRecords, newCommitTime);