1
0

[HUDI-3356][HUDI-3203] HoodieData for metadata index records; BloomFilter construction from index based on the type param (#4848)

Rework of #4761 
This diff introduces following changes:

- Write stats are converted to metadata index records during the commit. Making them use the HoodieData type so that the record generation scales up with needs. 
- Metadata index init support for bloom filter and column stats partitions.
- When building the BloomFilter from the index records, using the type param stored in the payload instead of hardcoded type.
- Delta writes can change column ranges and the column stats index need to be properly updated with new ranges to be consistent with the table dataset. This fix add column stats index update support for the delta writes.

Co-authored-by: Manoj Govindassamy <manoj.govindassamy@gmail.com>
This commit is contained in:
Sagar Sumit
2022-03-08 21:09:04 +05:30
committed by GitHub
parent ed26c5265c
commit 575bc63468
24 changed files with 1051 additions and 533 deletions

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.client.functional;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -47,7 +48,6 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
@@ -63,6 +63,7 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -82,17 +83,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@Tag("functional")
public class TestHoodieIndex extends HoodieClientTestHarness {
public class TestHoodieIndex extends TestHoodieMetadataBase {
private static Stream<Arguments> indexTypeParams() {
// IndexType, populateMetaFields, enableMetadataIndex
Object[][] data = new Object[][] {
{IndexType.BLOOM, true},
{IndexType.GLOBAL_BLOOM, true},
{IndexType.SIMPLE, true},
{IndexType.GLOBAL_SIMPLE, true},
{IndexType.SIMPLE, false},
{IndexType.GLOBAL_SIMPLE, false},
{IndexType.BUCKET, false}
{IndexType.BLOOM, true, true},
{IndexType.BLOOM, true, false},
{IndexType.GLOBAL_BLOOM, true, true},
{IndexType.GLOBAL_BLOOM, true, false},
{IndexType.SIMPLE, true, true},
{IndexType.SIMPLE, true, false},
{IndexType.SIMPLE, false, true},
{IndexType.SIMPLE, false, false},
{IndexType.GLOBAL_SIMPLE, true, true},
{IndexType.GLOBAL_SIMPLE, false, true},
{IndexType.GLOBAL_SIMPLE, false, false},
{IndexType.BUCKET, false, true},
{IndexType.BUCKET, false, false}
};
return Stream.of(data).map(Arguments::of);
}
@@ -103,11 +111,11 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
private HoodieIndex index;
private HoodieWriteConfig config;
private void setUp(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields, true);
private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
setUp(indexType, populateMetaFields, true, enableMetadataIndex);
}
private void setUp(IndexType indexType, boolean populateMetaFields, boolean rollbackUsingMarkers) throws Exception {
private void setUp(IndexType indexType, boolean populateMetaFields, boolean rollbackUsingMarkers, boolean enableMetadataIndex) throws Exception {
this.indexType = indexType;
initPath();
initSparkContexts();
@@ -123,8 +131,13 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withIndexConfig(indexBuilder.build())
.withAutoCommit(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMetadataIndexBloomFilter(enableMetadataIndex)
.withMetadataIndexColumnStats(enableMetadataIndex)
.build())
.withLayoutConfig(HoodieLayoutConfig.newBuilder().fromProperties(indexBuilder.build().getProps())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).build();
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.build();
writeClient = getHoodieWriteClient(config);
this.index = writeClient.getIndex();
}
@@ -136,8 +149,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@ParameterizedTest
@MethodSource("indexTypeParams")
public void testSimpleTagLocationAndUpdate(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields);
public void testSimpleTagLocationAndUpdate(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
setUp(indexType, populateMetaFields, enableMetadataIndex);
String newCommitTime = "001";
int totalRecords = 10 + random.nextInt(20);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
@@ -186,8 +199,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@ParameterizedTest
@MethodSource("indexTypeParams")
public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields);
public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
setUp(indexType, populateMetaFields, enableMetadataIndex);
String newCommitTime = "001";
int totalRecords = 10 + random.nextInt(20);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
@@ -236,8 +249,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@ParameterizedTest
@MethodSource("indexTypeParams")
public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields, false);
public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
setUp(indexType, populateMetaFields, false, enableMetadataIndex);
String newCommitTime = writeClient.startCommit();
int totalRecords = 20 + random.nextInt(20);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
@@ -286,17 +299,21 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
}
private static Stream<Arguments> regularIndexTypeParams() {
// IndexType, populateMetaFields, enableMetadataIndex
Object[][] data = new Object[][] {
{IndexType.BLOOM, true},
{IndexType.SIMPLE, true}
// TODO (codope): Enabling metadata index is flaky. Both bloom_filter and col_stats get generated but loading column ranges from the index is failing.
// {IndexType.BLOOM, true, true},
{IndexType.BLOOM, true, false},
{IndexType.SIMPLE, true, true},
{IndexType.SIMPLE, true, false}
};
return Stream.of(data).map(Arguments::of);
}
@ParameterizedTest
@MethodSource("regularIndexTypeParams")
public void testTagLocationAndFetchRecordLocations(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields);
public void testTagLocationAndFetchRecordLocations(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {
setUp(indexType, populateMetaFields, enableMetadataIndex);
String p1 = "2016/01/31";
String p2 = "2015/01/31";
String rowKey1 = UUID.randomUUID().toString();
@@ -320,7 +337,9 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
HoodieRecord record4 =
new HoodieAvroRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));
String newCommitTime = writeClient.startCommit();
metaClient = HoodieTableMetaClient.reload(metaClient);
writeClient.upsert(recordRDD, newCommitTime);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(index, recordRDD, hoodieTable);
@@ -330,20 +349,42 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
assertFalse(record.isCurrentLocationKnown());
}
// We create three parquet file, each having one record. (two different partitions)
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA);
String fileId1 = testTable.addCommit("001").getFileIdWithInserts(p1, record1);
String fileId2 = testTable.addCommit("002").getFileIdWithInserts(p1, record2);
String fileId3 = testTable.addCommit("003").getFileIdWithInserts(p2, record4);
// We create three parquet files, each having one record (two different partitions)
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
final String fileId1 = "fileID1";
final String fileId2 = "fileID2";
final String fileId3 = "fileID3";
Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
Path baseFilePath = testTable.forCommit("0000001").withInserts(p1, fileId1, Collections.singletonList(record1));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(p1, k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation("0000001", WriteOperationType.UPSERT, Arrays.asList(p1, p2),
partitionToFilesNameLengthMap, false, false);
partitionToFilesNameLengthMap.clear();
baseFilePath = testTable.forCommit("0000002").withInserts(p1, fileId2, Collections.singletonList(record2));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(p1, k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation("0000002", WriteOperationType.UPSERT, Arrays.asList(p1, p2),
partitionToFilesNameLengthMap, false, false);
partitionToFilesNameLengthMap.clear();
baseFilePath = testTable.forCommit("0000003").withInserts(p2, fileId3, Collections.singletonList(record4));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(p2, k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation("0000003", WriteOperationType.UPSERT, Arrays.asList(p1, p2),
partitionToFilesNameLengthMap, false, false);
// We do the tag again
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
taggedRecordRDD = tagLocation(index, recordRDD, hoodieTable);
List<HoodieRecord> records = taggedRecordRDD.collect();
// Check results
for (HoodieRecord record : taggedRecordRDD.collect()) {
for (HoodieRecord record : records) {
if (record.getRecordKey().equals(rowKey1)) {
if (record.getPartitionPath().equals(p2)) {
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
@@ -378,12 +419,17 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@Test
public void testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath() throws Exception {
setUp(IndexType.GLOBAL_SIMPLE, true);
setUp(IndexType.GLOBAL_SIMPLE, true, true);
config = getConfigBuilder()
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
.withGlobalSimpleIndexUpdatePartitionPath(true)
.withBloomIndexUpdatePartitionPath(true)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withMetadataIndexBloomFilter(true)
.withMetadataIndexColumnStats(true)
.build())
.build();
writeClient = getHoodieWriteClient(config);
index = writeClient.getIndex();
@@ -432,7 +478,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
final String file1P1C0 = UUID.randomUUID().toString();
Map<String, List<Pair<String, Integer>>> c1PartitionToFilesNameLengthMap = new HashMap<>();
c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, 100)));
// We have some records to be tagged (two different partitions)
Path baseFilePath = testTable.forCommit("1000").withInserts(p1, file1P1C0, Collections.singletonList(originalRecord));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
c1PartitionToFilesNameLengthMap.put(p1, Collections.singletonList(Pair.of(file1P1C0, Integer.valueOf((int) baseFileLength))));
testTable.doWriteOperation("1000", WriteOperationType.INSERT, Arrays.asList(p1),
c1PartitionToFilesNameLengthMap, false, false);

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.client.functional;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
@@ -52,8 +52,9 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
@@ -437,5 +438,4 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
}
return builder.build();
}
}

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.index.bloom;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.functional.TestHoodieMetadataBase;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
@@ -28,6 +26,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
@@ -37,10 +36,14 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
@@ -49,10 +52,11 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Tuple2;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -61,6 +65,8 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.Tuple2;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -75,8 +81,13 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}";
public static Stream<Arguments> configParams() {
Object[][] data =
new Object[][]{{true, true, true}, {false, true, true}, {true, true, false}, {true, false, true}};
// rangePruning, treeFiltering, bucketizedChecking
Object[][] data = new Object[][] {
{true, true, true},
{false, true, true},
{true, true, false},
{true, false, true}
};
return Stream.of(data).map(Arguments::of);
}
@@ -87,6 +98,11 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
initFileSystem();
// We have some records to be tagged (two different partitions)
initMetaClient();
HoodieIndexConfig.Builder indexBuilder = HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(indexBuilder.build())
.build();
writeClient = getHoodieWriteClient(config);
}
@AfterEach
@@ -112,7 +128,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
// Create some partitions, and put some files
// "2016/01/21": 0 file
@@ -142,10 +158,40 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
// Still 0, as no valid commit
assertEquals(0, filesList.size());
testTable.addCommit("20160401010101").withInserts("2016/04/01", "2");
testTable.addCommit("20150312101010").withInserts("2015/03/12", "1")
.withInserts("2015/03/12", "3", record1)
.withInserts("2015/03/12", "4", record2, record3, record4);
final String fileId1 = "1";
final String fileId2 = "2";
final String fileId3 = "3";
final String fileId4 = "4";
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
String commitTime = "20160401010101";
Path baseFilePath = testTable.forCommit(commitTime).withInserts(partitions.get(1), fileId2, Collections.emptyList());
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(1),
k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Arrays.asList(partitions.get(1)),
partitionToFilesNameLengthMap, false, false);
commitTime = "20150312101010";
partitionToFilesNameLengthMap.clear();
testTable.forCommit(commitTime);
baseFilePath = testTable.withInserts(partitions.get(2), fileId1, Collections.emptyList());
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(2),
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
baseFilePath = testTable.withInserts(partitions.get(2), fileId3, Collections.singletonList(record1));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(2),
k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
baseFilePath = testTable.withInserts(partitions.get(2), fileId4, Arrays.asList(record2, record3, record4));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partitions.get(2),
k -> new ArrayList<>()).add(Pair.of(fileId4, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Arrays.asList(partitions.get(2)),
partitionToFilesNameLengthMap, false, false);
filesList = index.loadColumnRangesFromFiles(partitions, context, hoodieTable);
assertEquals(4, filesList.size());
@@ -229,9 +275,20 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
// record2, record3).
BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
filter.add(record3.getRecordKey());
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, filter);
String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2);
String filename = testTable.getBaseFileNameById(fileId);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, filter, metadataWriter);
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
final String commitTime = "0000001";
final String fileId = UUID.randomUUID().toString();
Path baseFilePath = testTable.forCommit(commitTime)
.withInserts(partition, fileId, Arrays.asList(record1, record2));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partition,
k -> new ArrayList<>()).add(Pair.of(fileId, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition),
partitionToFilesNameLengthMap, false, false);
final String filename = testTable.getBaseFileNameById(fileId);
// The bloom filter contains 3 records
assertTrue(filter.mightContain(record1.getRecordKey()));
@@ -305,7 +362,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
@@ -316,10 +373,39 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
assertFalse(record.isCurrentLocationKnown());
}
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
final String partition1 = "2016/01/31";
final String partition2 = "2015/01/31";
// We create three parquet file, each having one record. (two different partitions)
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
final String fileId1 = UUID.randomUUID().toString();
final String commit1 = "0000001";
Path baseFilePath = testTable.forCommit(commit1).withInserts(partition1, fileId1, Collections.singletonList(record1));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partition1,
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(partition1),
partitionToFilesNameLengthMap, false, false);
final String fileId2 = UUID.randomUUID().toString();
final String commit2 = "0000002";
baseFilePath = testTable.forCommit(commit2).withInserts(partition1, fileId2, Collections.singletonList(record2));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(partition1,
k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(partition1),
partitionToFilesNameLengthMap, false, false);
final String fileId3 = UUID.randomUUID().toString();
final String commit3 = "0000003";
baseFilePath = testTable.forCommit(commit3).withInserts(partition2, fileId3, Collections.singletonList(record4));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(partition2,
k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit3, WriteOperationType.UPSERT, Collections.singletonList(partition2),
partitionToFilesNameLengthMap, false, false);
// We do the tag again
taggedRecordRDD = tagLocation(bloomIndex, recordRDD, HoodieSparkTable.create(config, context, metaClient));
@@ -327,7 +413,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
// Check results
for (HoodieRecord record : taggedRecordRDD.collect()) {
if (record.getRecordKey().equals(rowKey1)) {
if (record.getPartitionPath().equals("2015/01/31")) {
if (record.getPartitionPath().equals(partition2)) {
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
} else {
assertEquals(record.getCurrentLocation().getFileId(), fileId1);
@@ -370,7 +456,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
// Also create the metadata and config
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
@@ -387,10 +473,38 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
assertTrue(!record._2.isPresent());
}
final String partition1 = "2016/01/31";
final String partition2 = "2015/01/31";
final String fileId1 = UUID.randomUUID().toString();
final String fileId2 = UUID.randomUUID().toString();
final String fileId3 = UUID.randomUUID().toString();
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
// We create three parquet file, each having one record. (two different partitions)
String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
final String commit1 = "0000001";
Path baseFilePath = testTable.forCommit(commit1).withInserts(partition1, fileId1, Collections.singletonList(record1));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partition1,
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(partition1),
partitionToFilesNameLengthMap, false, false);
final String commit2 = "0000002";
partitionToFilesNameLengthMap.clear();
baseFilePath = testTable.forCommit(commit2).withInserts(partition1, fileId2, Collections.singletonList(record2));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partition1,
k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(partition1),
partitionToFilesNameLengthMap, false, false);
final String commit3 = "0000003";
partitionToFilesNameLengthMap.clear();
baseFilePath = testTable.forCommit(commit3).withInserts(partition2, fileId3, Collections.singletonList(record4));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partition2,
k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commit3, WriteOperationType.UPSERT, Collections.singletonList(partition2),
partitionToFilesNameLengthMap, false, false);
// We do the tag again
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -409,7 +523,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
assertEquals(fileId1, record._2.get().getRight());
} else if (record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
assertTrue(record._2.isPresent());
if (record._1.getPartitionPath().equals("2015/01/31")) {
if (record._1.getPartitionPath().equals(partition2)) {
assertEquals(fileId3, record._2.get().getRight());
} else {
assertEquals(fileId2, record._2.get().getRight());

View File

@@ -18,22 +18,25 @@
package org.apache.hudi.index.bloom;
import org.apache.hudi.client.functional.TestHoodieMetadataBase;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
@@ -41,12 +44,14 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -59,7 +64,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
public class TestHoodieGlobalBloomIndex extends TestHoodieMetadataBase {
private static final Schema SCHEMA = getSchemaFromResource(TestHoodieGlobalBloomIndex.class, "/exampleSchema.avsc", true);
@@ -67,7 +72,13 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
public void setUp() throws Exception {
initSparkContexts();
initPath();
initFileSystem();
initMetaClient();
HoodieIndexConfig.Builder indexBuilder = HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(indexBuilder.build())
.build();
writeClient = getHoodieWriteClient(config);
}
@AfterEach
@@ -81,13 +92,15 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
HoodieGlobalBloomIndex index =
new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
// Create some partitions, and put some files, along with the meta file
// "2016/01/21": 0 file
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
final String p1 = "2016/01/21";
final String p2 = "2016/04/01";
final String p3 = "2015/03/12";
RawTripTestPayload rowChange1 =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -107,16 +120,46 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
new HoodieAvroRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
List<String> partitions = Arrays.asList(p1, p2);
// partitions will NOT be respected by this loadInvolvedFiles(...) call
List<Pair<String, BloomIndexFileInfo>> filesList = index.loadColumnRangesFromFiles(partitions, context, hoodieTable);
// Still 0, as no valid commit
assertEquals(0, filesList.size());
testTable.addCommit("20160401010101").withInserts("2016/04/01", "2");
testTable.addCommit("20150312101010").withInserts("2015/03/12", "1")
.withInserts("2015/03/12", "3", record1)
.withInserts("2015/03/12", "4", record2, record3, record4);
final String fileId1 = "1";
final String fileId2 = "2";
final String fileId3 = "3";
final String fileId4 = "4";
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
final String c1 = "20160401010101";
Path baseFilePath = testTable.forCommit(c1).withInserts(p2, fileId2, Collections.emptyList());
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(p2,
k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(c1, WriteOperationType.UPSERT, Collections.singletonList(p2),
partitionToFilesNameLengthMap, false, false);
final String c2 = "20150312101010";
testTable.forCommit(c2);
baseFilePath = testTable.withInserts(p3, fileId1, Collections.emptyList());
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(p3,
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
baseFilePath = testTable.withInserts(p3, fileId3, Collections.singletonList(record1));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(p3,
k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
baseFilePath = testTable.withInserts(p3, fileId4, Arrays.asList(record2, record3, record4));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(p3,
k -> new ArrayList<>()).add(Pair.of(fileId4, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(c2, WriteOperationType.UPSERT, Collections.singletonList(p3),
partitionToFilesNameLengthMap, false, false);
filesList = index.loadColumnRangesFromFiles(partitions, context, hoodieTable);
assertEquals(4, filesList.size());
@@ -185,17 +228,21 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
@Test
public void testTagLocation() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(false).build()).build();
HoodieGlobalBloomIndex index =
new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
.withBloomIndexUpdatePartitionPath(false)
.build())
.build();
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
// Create some partitions, and put some files, along with the meta file
// "2016/01/21": 0 file
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet)
testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12");
final String partition2 = "2016/04/01";
final String partition3 = "2015/03/12";
RawTripTestPayload rowChange1 =
new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
@@ -223,13 +270,49 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
HoodieRecord record5 =
new HoodieAvroRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), rowChange5);
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
final String fileId1 = UUID.randomUUID().toString();
final String fileId2 = UUID.randomUUID().toString();
final String fileId3 = UUID.randomUUID().toString();
final String fileId4 = UUID.randomUUID().toString();
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
String fileId1 = testTable.addCommit("1000").getFileIdWithInserts("2016/04/01", record1);
String fileId2 = testTable.addCommit("2000").getFileIdWithInserts("2015/03/12");
String fileId3 = testTable.addCommit("3000").getFileIdWithInserts("2015/03/12", record2);
String fileId4 = testTable.addCommit("4000").getFileIdWithInserts("2015/03/12", record4);
String commitTime = "0000001";
Path baseFilePath = testTable.forCommit(commitTime).withInserts(partition2, fileId1, Collections.singletonList(record1));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(partition2,
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition2),
partitionToFilesNameLengthMap, false, false);
commitTime = "0000002";
baseFilePath = testTable.forCommit(commitTime).withInserts(partition3, fileId2, Collections.emptyList());
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(partition3,
k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition3),
partitionToFilesNameLengthMap, false, false);
commitTime = "0000003";
baseFilePath = testTable.forCommit(commitTime).withInserts(partition3, fileId3, Collections.singletonList(record2));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(partition3,
k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition3),
partitionToFilesNameLengthMap, false, false);
commitTime = "0000004";
baseFilePath = testTable.forCommit(commitTime).withInserts(partition3, fileId4, Collections.singletonList(record4));
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.clear();
partitionToFilesNameLengthMap.computeIfAbsent(partition3,
k -> new ArrayList<>()).add(Pair.of(fileId4, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Collections.singletonList(partition3),
partitionToFilesNameLengthMap, false, false);
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
// partitions will NOT be respected by this loadInvolvedFiles(...) call
JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(index, recordRDD, hoodieTable);
@@ -266,12 +349,15 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
.withBloomIndexUpdatePartitionPath(true)
.build())
.build();
HoodieGlobalBloomIndex index =
new HoodieGlobalBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(hoodieTable, SCHEMA);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
final String p1 = "2016/01/31";
final String p2 = "2016/02/28";
@@ -309,7 +395,16 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()),
incomingPayloadSamePartition);
testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord);
final String fileId1 = UUID.randomUUID().toString();
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
final String commitTime = "0000001";
Path baseFilePath = testTable.forCommit(commitTime).withInserts(p1, fileId1, Collections.singletonList(originalRecord));
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
partitionToFilesNameLengthMap.computeIfAbsent(p1,
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT, Arrays.asList(p1),
partitionToFilesNameLengthMap, false, false);
// test against incoming record with a different partition
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -65,7 +66,7 @@ public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema,
HoodieTableMetadataWriter metadataWriter) {
BloomFilter filter = BloomFilterFactory
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.DYNAMIC_V0.name());
return of(metaClient, schema, filter, metadataWriter);
}
@@ -108,11 +109,11 @@ public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
}
public HoodieSparkWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception {
return withInserts(partition, fileId, Arrays.asList(records));
}
public HoodieSparkWriteableTestTable withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
super.withInserts(partition, fileId, records, new SparkTaskContextSupplier());
withInserts(partition, fileId, Arrays.asList(records));
return this;
}
public Path withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
return super.withInserts(partition, fileId, records, new SparkTaskContextSupplier());
}
}