1
0

[HUDI-1350] Support Partition level delete API in HUDI (#2254)

* [HUDI-1350] Support Partition level delete API in HUDI

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction
This commit is contained in:
lw0090
2020-12-29 07:01:06 +08:00
committed by GitHub
parent 6cdf59d92b
commit e177466fd2
11 changed files with 228 additions and 16 deletions

View File

@@ -1103,7 +1103,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
* Test scenario of writing similar number file groups in partition.
*/
@Test
public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords() throws Exception {
public void testInsertOverwritePartitionHandlingWithSimilarNumberOfRecords() throws Exception {
verifyInsertOverwritePartitionHandling(3000, 3000);
}
@@ -1143,6 +1143,109 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
return statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
}
/**
* Test scenario of writing fewer file groups for first partition than second an third partition.
*/
@Test
public void verifyDeletePartitionsHandlingWithFewerRecordsFirstPartition() throws Exception {
verifyDeletePartitionsHandling(1000, 3000, 3000);
}
/**
* Test scenario of writing similar number file groups in partition.
*/
@Test
public void verifyDeletePartitionsHandlingWithSimilarNumberOfRecords() throws Exception {
verifyDeletePartitionsHandling(3000, 3000, 3000);
}
/**
* Test scenario of writing more file groups for first partition than second an third partition.
*/
@Test
public void verifyDeletePartitionsHandlingHandlingWithFewerRecordsSecondThirdPartition() throws Exception {
verifyDeletePartitionsHandling(3000, 1000, 1000);
}
private Set<String> insertPartitionRecordsWithCommit(SparkRDDWriteClient client, int recordsCount, String commitTime1, String partitionPath) {
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInsertsForPartition(commitTime1, recordsCount, partitionPath);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
Set<String> batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
verifyRecordsWritten(commitTime1, inserts1, statuses);
return batchBuckets;
}
private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List<String> deletePartitionPath) {
client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime);
Set<String> deletePartitionReplaceFileIds =
writeResult.getPartitionToReplaceFileIds().entrySet()
.stream().flatMap(entry -> entry.getValue().stream()).collect(Collectors.toSet());
return deletePartitionReplaceFileIds;
}
/**
* 1) Do write1 (upsert) with 'batch1RecordsCount' number of records for first partition.
* 2) Do write2 (upsert) with 'batch2RecordsCount' number of records for second partition.
* 3) Do write3 (upsert) with 'batch3RecordsCount' number of records for third partition.
* 4) delete first partition and check result.
* 5) delete second and third partition and check result.
*
*/
private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount) throws Exception {
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
dataGen = new HoodieTestDataGenerator();
// Do Inserts for DEFAULT_FIRST_PARTITION_PATH
String commitTime1 = "001";
Set<String> batch1Buckets =
this.insertPartitionRecordsWithCommit(client, batch1RecordsCount, commitTime1, DEFAULT_FIRST_PARTITION_PATH);
// Do Inserts for DEFAULT_SECOND_PARTITION_PATH
String commitTime2 = "002";
Set<String> batch2Buckets =
this.insertPartitionRecordsWithCommit(client, batch2RecordsCount, commitTime2, DEFAULT_SECOND_PARTITION_PATH);
// Do Inserts for DEFAULT_THIRD_PARTITION_PATH
String commitTime3 = "003";
Set<String> batch3Buckets =
this.insertPartitionRecordsWithCommit(client, batch3RecordsCount, commitTime3, DEFAULT_THIRD_PARTITION_PATH);
// delete DEFAULT_FIRST_PARTITION_PATH
String commitTime4 = "004";
Set<String> deletePartitionReplaceFileIds1 =
deletePartitionWithCommit(client, commitTime4, Arrays.asList(DEFAULT_FIRST_PARTITION_PATH));
assertEquals(batch1Buckets, deletePartitionReplaceFileIds1);
List<HoodieBaseFile> baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH));
assertEquals(0, baseFiles.size());
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH));
assertTrue(baseFiles.size() > 0);
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
assertTrue(baseFiles.size() > 0);
// delete DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH
String commitTime5 = "005";
Set<String> deletePartitionReplaceFileIds2 =
deletePartitionWithCommit(client, commitTime5, Arrays.asList(DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH));
Set<String> expectedFileId = new HashSet<>();
expectedFileId.addAll(batch2Buckets);
expectedFileId.addAll(batch3Buckets);
assertEquals(expectedFileId, deletePartitionReplaceFileIds2);
baseFiles = HoodieClientTestUtils.getLatestBaseFiles(basePath, fs,
String.format("%s/%s/*", basePath, DEFAULT_FIRST_PARTITION_PATH),
String.format("%s/%s/*", basePath, DEFAULT_SECOND_PARTITION_PATH),
String.format("%s/%s/*", basePath, DEFAULT_THIRD_PARTITION_PATH));
assertEquals(0, baseFiles.size());
}
/**
* Verify data in parquet files matches expected records and commit time.
*/

View File

@@ -147,6 +147,22 @@ public class HoodieClientTestUtils {
}
}
public static List<HoodieBaseFile> getLatestBaseFiles(String basePath, FileSystem fs,
String... paths) {
List<HoodieBaseFile> latestFiles = new ArrayList<>();
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
for (String path : paths) {
BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
latestFiles.addAll(fileSystemView.getLatestBaseFiles().collect(Collectors.toList()));
}
} catch (Exception e) {
throw new HoodieException("Error reading hoodie table as a dataframe", e);
}
return latestFiles;
}
/**
* Reads the paths under the a hoodie table out as a DataFrame.
*/
@@ -154,14 +170,9 @@ public class HoodieClientTestUtils {
String... paths) {
List<String> filteredPaths = new ArrayList<>();
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
for (String path : paths) {
BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
List<HoodieBaseFile> latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
for (HoodieBaseFile file : latestFiles) {
filteredPaths.add(file.getPath());
}
List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, fs, paths);
for (HoodieBaseFile file : latestFiles) {
filteredPaths.add(file.getPath());
}
return sqlContext.read().parquet(filteredPaths.toArray(new String[filteredPaths.size()]));
} catch (Exception e) {