1
0

[HUDI-1068] Fixing deletes in global bloom when update partition path is set (#1793)

This commit is contained in:
Sivabalan Narayanan
2020-07-13 22:34:07 -04:00
committed by GitHub
parent 10e457278b
commit 21bb1b505a
4 changed files with 202 additions and 48 deletions

View File

@@ -123,11 +123,13 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
if (config.getBloomIndexUpdatePartitionPath()
&& !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
HoodieRecord<T> deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
new EmptyHoodieRecordPayload());
deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get()._1());
deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
return Arrays.asList(emptyRecord, taggedRecord).iterator();
HoodieRecord<T> insertRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
return Arrays.asList(deleteRecord, insertRecord).iterator();
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.

View File

@@ -130,10 +130,12 @@ public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends Hood
HoodieRecordLocation location = partitionPathLocationPair.get().getRight();
if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
HoodieRecord<T> deleteRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
deleteRecord.setCurrentLocation(location);
deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
HoodieRecord<T> taggedRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
taggedRecords = Arrays.asList(emptyRecord, taggedRecord);
HoodieRecord<T> insertRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
taggedRecords = Arrays.asList(deleteRecord, insertRecord);
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
@@ -55,7 +56,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.FileInputStream;
import java.io.IOException;
@@ -73,6 +78,9 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.NULL_SCHEMA;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -143,7 +151,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
* @throws Exception in case of failure
*/
private void testAutoCommit(Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean isPrepped) throws Exception {
boolean isPrepped) throws Exception {
// Set autoCommit false
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -275,12 +283,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
/**
* Test one of HoodieWriteClient upsert(Prepped) APIs.
*
* @param config Write Config
* @param config Write Config
* @param writeFn One of Hoodie Write Function API
* @throws Exception in case of error
*/
private void testUpsertsInternal(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
@@ -400,56 +408,177 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
/**
* Test update of a record to different partition with Global Index.
* Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
*/
@Test
public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
/**
* Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
*/
@ParameterizedTest
@EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})
public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception {
testUpsertsUpdatePartitionPath(indexType, getConfig(),
HoodieWriteClient::upsert);
}
/**
* This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
* compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
* in the new partition.
* test structure:
* 1. insert 1 batch
* 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
* 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
* records are upserted to the new partition
*
* @param indexType index type to be tested for
* @param config instance of {@link HoodieWriteConfig} to use
* @param writeFn write function to be used for testing
*/
private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
throws Exception {
// instantiate client
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
.withProps(config.getProps())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
.withBloomIndexUpdatePartitionPath(true)
.withGlobalSimpleIndexUpdatePartitionPath(true)
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
// Write 1
String newCommitTime = "001";
List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
// Write 1 (only inserts)
int numRecords = 10;
client.startCommitWithTime(newCommitTime);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
Set<Pair<String, String>> expectedPartitionPathRecKeyPairs = new HashSet<>();
// populate expected partition path and record keys
for (HoodieRecord rec : records) {
expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
}
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
// check the partition metadata is written out
assertPartitionMetadataForRecords(inserts1, fs);
// Check the entire dataset has all records
String[] fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
// verify one basefile per partition
Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
assertEquals(1, entry.getValue());
}
assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1));
// Write 2
newCommitTime = "002";
numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
// populate expected partition path and record keys
for (HoodieRecord rec : recordsSecondBatch) {
expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
}
writeRecords = jsc.parallelize(recordsSecondBatch, 1);
result = writeFn.apply(client, writeRecords, newCommitTime);
result.collect();
// Check the entire dataset has all records
fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
// verify that there are more than 1 basefiles per partition
// we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
baseFileCounts = getBaseFileCounts(fullPartitionPaths);
assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1,
"Atleast one partition should have more than 1 base file after 2nd batch of writes");
// Write 3 (upserts to records from batch 1 with diff partition path)
newCommitTime = "003";
// update to diff partition paths
List<HoodieRecord> recordsToUpsert = new ArrayList<>();
for (HoodieRecord rec : records) {
// remove older entry from expected partition path record key pairs
expectedPartitionPathRecKeyPairs
.remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
String partitionPath = rec.getPartitionPath();
String newPartitionPath = null;
if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
} else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
} else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
} else {
throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
}
recordsToUpsert.add(
new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
rec.getData()));
// populate expected partition path and record keys
expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
}
writeRecords = jsc.parallelize(recordsToUpsert, 1);
result = writeFn.apply(client, writeRecords, newCommitTime);
statuses = result.collect();
// Check the entire dataset has all records
fullPartitionPaths = getFullPartitionPaths();
assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
}
private void assertPartitionPathRecordKeys(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
Dataset<Row> rows = getAllRows(fullPartitionPaths);
List<Pair<String, String>> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows);
// verify all partitionpath, record key matches
assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
}
private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
for (Row row : rows.collectAsList()) {
actualPartitionPathRecKeyPairs
.add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
}
return actualPartitionPathRecKeyPairs;
}
private Dataset<org.apache.spark.sql.Row> getAllRows(String[] fullPartitionPaths) {
return HoodieClientTestUtils
.read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
}
private String[] getFullPartitionPaths() {
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain 100 records");
return fullPartitionPaths;
}
/**
* Write 2. Updates with different partition
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths);
}
List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
List<WriteStatus> statuses1 = result1.collect();
assertNoWriteErrors(statuses1);
// check the partition metadata is written out
assertPartitionMetadataForRecords(updates1, fs);
// Check the entire dataset has all records still
fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs,
List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
// verify all partitionpath, record key matches
assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
}
for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
}
assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
"Must contain 100 records");
}
/**
@@ -715,7 +844,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, HoodieWriteClient client,
int sizeToInsertAndUpdate, int expectedTotalRecords)
int sizeToInsertAndUpdate, int expectedTotalRecords)
throws IOException {
client.startCommitWithTime(instantTime);
List<HoodieRecord> inserts = dataGen.generateInserts(instantTime, sizeToInsertAndUpdate);
@@ -740,7 +869,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
private void testDeletes(HoodieWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete,
String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
client.startCommitWithTime(instantTime);
List<HoodieKey> hoodieKeysToDelete = HoodieClientTestUtils

View File

@@ -67,6 +67,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -223,6 +224,26 @@ public class HoodieClientTestUtils {
}
}
/**
* Find total basefiles for passed in paths.
*/
public static Map<String, Integer> getBaseFileCountForPaths(String basePath, FileSystem fs,
String... paths) {
Map<String, Integer> toReturn = new HashMap<>();
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
for (String path : paths) {
BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
List<HoodieBaseFile> latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
toReturn.put(path, latestFiles.size());
}
return toReturn;
} catch (Exception e) {
throw new HoodieException("Error reading hoodie table as a dataframe", e);
}
}
public static String writeParquetFile(String basePath, String partitionPath, String filename,
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {