From 21bb1b505a0a433bdbe367f7f252afdddafafc96 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 13 Jul 2020 22:34:07 -0400 Subject: [PATCH] [HUDI-1068] Fixing deletes in global bloom when update partition path is set (#1793) --- .../index/bloom/HoodieGlobalBloomIndex.java | 8 +- .../index/simple/HoodieGlobalSimpleIndex.java | 8 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 213 ++++++++++++++---- .../hudi/testutils/HoodieClientTestUtils.java | 21 ++ 4 files changed, 202 insertions(+), 48 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java index 1e57a3891..4f93b3029 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java @@ -123,11 +123,13 @@ public class HoodieGlobalBloomIndex extends Hoodi if (config.getBloomIndexUpdatePartitionPath() && !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) { // Create an empty record to delete the record in the old partition - HoodieRecord emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2, + HoodieRecord deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2, new EmptyHoodieRecordPayload()); + deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get()._1()); + deleteRecord.seal(); // Tag the incoming record for inserting to the new partition - HoodieRecord taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty()); - return Arrays.asList(emptyRecord, taggedRecord).iterator(); + HoodieRecord insertRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty()); + return Arrays.asList(deleteRecord, insertRecord).iterator(); } else { // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not. // When it differs, the record will still be updated at its old partition. diff --git a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java index bb1d8d69b..990f02dce 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java @@ -130,10 +130,12 @@ public class HoodieGlobalSimpleIndex extends Hood HoodieRecordLocation location = partitionPathLocationPair.get().getRight(); if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) { // Create an empty record to delete the record in the old partition - HoodieRecord emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload()); + HoodieRecord deleteRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload()); + deleteRecord.setCurrentLocation(location); + deleteRecord.seal(); // Tag the incoming record for inserting to the new partition - HoodieRecord taggedRecord = (HoodieRecord) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()); - taggedRecords = Arrays.asList(emptyRecord, taggedRecord); + HoodieRecord insertRecord = (HoodieRecord) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()); + taggedRecords = Arrays.asList(deleteRecord, insertRecord); } else { // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not. // When it differs, the record will still be updated at its old partition. diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 1f86bb26d..5518a3f4a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; @@ -55,7 +56,11 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.FileInputStream; import java.io.IOException; @@ -73,6 +78,9 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; +import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; +import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; import static org.apache.hudi.testutils.HoodieTestDataGenerator.NULL_SCHEMA; import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -143,7 +151,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { * @throws Exception in case of failure */ private void testAutoCommit(Function3, HoodieWriteClient, JavaRDD, String> writeFn, - boolean isPrepped) throws Exception { + boolean isPrepped) throws Exception { // Set autoCommit false HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { @@ -275,12 +283,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { /** * Test one of HoodieWriteClient upsert(Prepped) APIs. * - * @param config Write Config + * @param config Write Config * @param writeFn One of Hoodie Write Function API * @throws Exception in case of error */ private void testUpsertsInternal(HoodieWriteConfig config, - Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPrepped) + Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPrepped) throws Exception { // Force using older timeline layout HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion( @@ -400,56 +408,177 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } /** - * Test update of a record to different partition with Global Index. + * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately. */ - @Test - public void testUpsertToDiffPartitionGlobalIndex() throws Exception { - HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false); - /** - * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records - */ + @ParameterizedTest + @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"}) + public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception { + testUpsertsUpdatePartitionPath(indexType, getConfig(), + HoodieWriteClient::upsert); + } + + /** + * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition + * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted + * in the new partition. + * test structure: + * 1. insert 1 batch + * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions + * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new + * records are upserted to the new partition + * + * @param indexType index type to be tested for + * @param config instance of {@link HoodieWriteConfig} to use + * @param writeFn write function to be used for testing + */ + private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConfig config, + Function3, HoodieWriteClient, JavaRDD, String> writeFn) + throws Exception { + // instantiate client + + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withProps(config.getProps()) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType) + .withBloomIndexUpdatePartitionPath(true) + .withGlobalSimpleIndexUpdatePartitionPath(true) + .build()).withTimelineLayoutVersion(VERSION_0).build(); + HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(), + metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(), + metaClient.getTableConfig().getPayloadClass(), VERSION_0); + HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); + + // Write 1 String newCommitTime = "001"; - List inserts1 = dataGen.generateInserts(newCommitTime, 100); - - // Write 1 (only inserts) + int numRecords = 10; client.startCommitWithTime(newCommitTime); - JavaRDD writeRecords = jsc.parallelize(inserts1, 1); - JavaRDD result = client.insert(writeRecords, newCommitTime); + List records = dataGen.generateInserts(newCommitTime, numRecords); + Set> expectedPartitionPathRecKeyPairs = new HashSet<>(); + // populate expected partition path and record keys + for (HoodieRecord rec : records) { + expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey())); + } + JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD result = writeFn.apply(client, writeRecords, newCommitTime); List statuses = result.collect(); - assertNoWriteErrors(statuses); - // check the partition metadata is written out - assertPartitionMetadataForRecords(inserts1, fs); + // Check the entire dataset has all records + String[] fullPartitionPaths = getFullPartitionPaths(); + assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths); + + // verify one basefile per partition + Map baseFileCounts = getBaseFileCounts(fullPartitionPaths); + for (Map.Entry entry : baseFileCounts.entrySet()) { + assertEquals(1, entry.getValue()); + } + assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1)); + + // Write 2 + newCommitTime = "002"; + numRecords = 20; // so that a new file id is created + client.startCommitWithTime(newCommitTime); + + List recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords); + // populate expected partition path and record keys + for (HoodieRecord rec : recordsSecondBatch) { + expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey())); + } + writeRecords = jsc.parallelize(recordsSecondBatch, 1); + result = writeFn.apply(client, writeRecords, newCommitTime); + result.collect(); + + // Check the entire dataset has all records + fullPartitionPaths = getFullPartitionPaths(); + assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths); + + // verify that there are more than 1 basefiles per partition + // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile. + baseFileCounts = getBaseFileCounts(fullPartitionPaths); + assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1, + "Atleast one partition should have more than 1 base file after 2nd batch of writes"); + + // Write 3 (upserts to records from batch 1 with diff partition path) + newCommitTime = "003"; + + // update to diff partition paths + List recordsToUpsert = new ArrayList<>(); + for (HoodieRecord rec : records) { + // remove older entry from expected partition path record key pairs + expectedPartitionPathRecKeyPairs + .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey())); + String partitionPath = rec.getPartitionPath(); + String newPartitionPath = null; + if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) { + newPartitionPath = DEFAULT_SECOND_PARTITION_PATH; + } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) { + newPartitionPath = DEFAULT_THIRD_PARTITION_PATH; + } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) { + newPartitionPath = DEFAULT_FIRST_PARTITION_PATH; + } else { + throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath()); + } + recordsToUpsert.add( + new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath), + rec.getData())); + // populate expected partition path and record keys + expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey())); + } + + writeRecords = jsc.parallelize(recordsToUpsert, 1); + result = writeFn.apply(client, writeRecords, newCommitTime); + statuses = result.collect(); + + // Check the entire dataset has all records + fullPartitionPaths = getFullPartitionPaths(); + assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths); + } + + private void assertPartitionPathRecordKeys(Set> expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) { + Dataset rows = getAllRows(fullPartitionPaths); + List> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows); + // verify all partitionpath, record key matches + assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs); + } + + private List> getActualPartitionPathAndRecordKeys(Dataset rows) { + List> actualPartitionPathRecKeyPairs = new ArrayList<>(); + for (Row row : rows.collectAsList()) { + actualPartitionPathRecKeyPairs + .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key"))); + } + return actualPartitionPathRecKeyPairs; + } + + private Dataset getAllRows(String[] fullPartitionPaths) { + return HoodieClientTestUtils + .read(jsc, basePath, sqlContext, fs, fullPartitionPaths); + } + + private String[] getFullPartitionPaths() { String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; for (int i = 0; i < fullPartitionPaths.length; i++) { fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); } - assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(), - "Must contain 100 records"); + return fullPartitionPaths; + } - /** - * Write 2. Updates with different partition - */ - newCommitTime = "004"; - client.startCommitWithTime(newCommitTime); + private Map getBaseFileCounts(String[] fullPartitionPaths) { + return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths); + } - List updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1); - JavaRDD updateRecords = jsc.parallelize(updates1, 1); - - JavaRDD result1 = client.upsert(updateRecords, newCommitTime); - List statuses1 = result1.collect(); - assertNoWriteErrors(statuses1); - - // check the partition metadata is written out - assertPartitionMetadataForRecords(updates1, fs); - // Check the entire dataset has all records still - fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; - for (int i = 0; i < fullPartitionPaths.length; i++) { - fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set> expectedPartitionPathRecKeyPairs, + List> actualPartitionPathRecKeyPairs) { + // verify all partitionpath, record key matches + assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size()); + for (Pair entry : actualPartitionPathRecKeyPairs) { + assertTrue(expectedPartitionPathRecKeyPairs.contains(entry)); + } + + for (Pair entry : expectedPartitionPathRecKeyPairs) { + assertTrue(actualPartitionPathRecKeyPairs.contains(entry)); } - assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(), - "Must contain 100 records"); } /** @@ -715,7 +844,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } private Pair, List> testUpdates(String instantTime, HoodieWriteClient client, - int sizeToInsertAndUpdate, int expectedTotalRecords) + int sizeToInsertAndUpdate, int expectedTotalRecords) throws IOException { client.startCommitWithTime(instantTime); List inserts = dataGen.generateInserts(instantTime, sizeToInsertAndUpdate); @@ -740,7 +869,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } private void testDeletes(HoodieWriteClient client, List previousRecords, int sizeToDelete, - String existingFile, String instantTime, int exepctedRecords, List keys) { + String existingFile, String instantTime, int exepctedRecords, List keys) { client.startCommitWithTime(instantTime); List hoodieKeysToDelete = HoodieClientTestUtils diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 145cde541..1bb827510 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -67,6 +67,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -223,6 +224,26 @@ public class HoodieClientTestUtils { } } + /** + * Find total basefiles for passed in paths. + */ + public static Map getBaseFileCountForPaths(String basePath, FileSystem fs, + String... paths) { + Map toReturn = new HashMap<>(); + try { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + for (String path : paths) { + BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); + List latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList()); + toReturn.put(path, latestFiles.size()); + } + return toReturn; + } catch (Exception e) { + throw new HoodieException("Error reading hoodie table as a dataframe", e); + } + } + public static String writeParquetFile(String basePath, String partitionPath, String filename, List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {