|
|
|
|
@@ -38,6 +38,7 @@ import org.apache.hudi.data.HoodieJavaPairRDD;
|
|
|
|
|
import org.apache.hudi.data.HoodieJavaRDD;
|
|
|
|
|
import org.apache.hudi.index.HoodieIndex;
|
|
|
|
|
import org.apache.hudi.index.HoodieIndexUtils;
|
|
|
|
|
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
|
|
|
|
import org.apache.hudi.table.HoodieSparkTable;
|
|
|
|
|
import org.apache.hudi.table.HoodieTable;
|
|
|
|
|
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
|
|
|
|
|
@@ -61,12 +62,14 @@ import java.util.HashMap;
|
|
|
|
|
import java.util.HashSet;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Random;
|
|
|
|
|
import java.util.UUID;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
|
|
|
import scala.Tuple2;
|
|
|
|
|
|
|
|
|
|
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudoRandomUUID;
|
|
|
|
|
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
|
|
|
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
|
|
|
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
|
|
@@ -78,15 +81,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
|
|
public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBloomIndex.class, "/exampleSchema.avsc", true);
|
|
|
|
|
private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}";
|
|
|
|
|
private static final String TEST_NAME_WITH_PARAMS =
|
|
|
|
|
"[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}, useMetadataTable={3}";
|
|
|
|
|
private static final Random RANDOM = new Random(0xDEED);
|
|
|
|
|
|
|
|
|
|
public static Stream<Arguments> configParams() {
|
|
|
|
|
// rangePruning, treeFiltering, bucketizedChecking
|
|
|
|
|
// rangePruning, treeFiltering, bucketizedChecking, useMetadataTable
|
|
|
|
|
Object[][] data = new Object[][] {
|
|
|
|
|
{true, true, true},
|
|
|
|
|
{false, true, true},
|
|
|
|
|
{true, true, false},
|
|
|
|
|
{true, false, true}
|
|
|
|
|
{true, true, true, false},
|
|
|
|
|
{false, true, true, false},
|
|
|
|
|
{true, true, false, false},
|
|
|
|
|
{true, false, true, false},
|
|
|
|
|
{true, true, true, true},
|
|
|
|
|
{false, true, true, true},
|
|
|
|
|
{true, true, false, true},
|
|
|
|
|
{true, false, true, true}
|
|
|
|
|
};
|
|
|
|
|
return Stream.of(data).map(Arguments::of);
|
|
|
|
|
}
|
|
|
|
|
@@ -110,24 +119,39 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
cleanupResources();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
|
|
|
|
|
private HoodieWriteConfig makeConfig(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking, boolean useMetadataTable) {
|
|
|
|
|
// For the bloom index to use column stats and bloom filters from metadata table,
|
|
|
|
|
// the following configs must be set to true:
|
|
|
|
|
// "hoodie.bloom.index.use.metadata"
|
|
|
|
|
// "hoodie.metadata.enable" (by default is true)
|
|
|
|
|
// "hoodie.metadata.index.column.stats.enable"
|
|
|
|
|
// "hoodie.metadata.index.bloom.filter.enable"
|
|
|
|
|
return HoodieWriteConfig.newBuilder().withPath(basePath)
|
|
|
|
|
.withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning)
|
|
|
|
|
.bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking)
|
|
|
|
|
.bloomIndexKeysPerBucket(2).build())
|
|
|
|
|
.withIndexConfig(HoodieIndexConfig.newBuilder()
|
|
|
|
|
.bloomIndexPruneByRanges(rangePruning)
|
|
|
|
|
.bloomIndexTreebasedFilter(treeFiltering)
|
|
|
|
|
.bloomIndexBucketizedChecking(bucketizedChecking)
|
|
|
|
|
.bloomIndexKeysPerBucket(2)
|
|
|
|
|
.bloomIndexUseMetadata(useMetadataTable)
|
|
|
|
|
.build())
|
|
|
|
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
|
|
|
|
|
.withMetadataIndexBloomFilter(false)
|
|
|
|
|
.withMetadataIndexColumnStats(false)
|
|
|
|
|
.withMetadataIndexBloomFilter(useMetadataTable)
|
|
|
|
|
.withMetadataIndexColumnStats(useMetadataTable)
|
|
|
|
|
.build())
|
|
|
|
|
.build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
|
|
|
|
@MethodSource("configParams")
|
|
|
|
|
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
|
|
|
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
|
|
|
|
public void testLoadInvolvedFiles(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
|
|
|
|
|
boolean useMetadataTable) throws Exception {
|
|
|
|
|
HoodieWriteConfig config =
|
|
|
|
|
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
|
|
|
|
|
HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
|
|
|
|
|
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
|
|
|
|
|
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
|
|
|
|
|
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
|
|
|
|
|
|
|
|
|
|
// Create some partitions, and put some files
|
|
|
|
|
@@ -218,8 +242,11 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
|
|
|
|
@MethodSource("configParams")
|
|
|
|
|
public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
|
|
|
|
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
|
|
|
|
public void testRangePruning(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
|
|
|
|
|
boolean useMetadataTable) {
|
|
|
|
|
HoodieWriteConfig config =
|
|
|
|
|
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
|
|
|
|
|
HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
|
|
|
|
|
|
|
|
|
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new HashMap<>();
|
|
|
|
|
@@ -279,7 +306,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
|
|
|
|
|
final String commitTime = "0000001";
|
|
|
|
|
final String fileId = UUID.randomUUID().toString();
|
|
|
|
|
final String fileId = genRandomUUID();
|
|
|
|
|
|
|
|
|
|
Path baseFilePath = testTable.forCommit(commitTime)
|
|
|
|
|
.withInserts(partition, fileId, Arrays.asList(record1, record2));
|
|
|
|
|
@@ -317,11 +344,14 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
|
|
|
|
@MethodSource("configParams")
|
|
|
|
|
public void testTagLocationWithEmptyRDD(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
|
|
|
|
|
public void testTagLocationWithEmptyRDD(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
|
|
|
|
|
boolean useMetadataTable) {
|
|
|
|
|
// We have some records to be tagged (two different partitions)
|
|
|
|
|
JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
|
|
|
|
|
// Also create the metadata and config
|
|
|
|
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
|
|
|
|
HoodieWriteConfig config =
|
|
|
|
|
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
|
|
|
|
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
|
|
|
|
HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);
|
|
|
|
|
|
|
|
|
|
@@ -335,11 +365,13 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
|
|
|
|
@MethodSource("configParams")
|
|
|
|
|
public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
|
|
|
|
public void testTagLocationOnPartitionedTable(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
|
|
|
|
|
boolean useMetadataTable) throws Exception {
|
|
|
|
|
// We have some records to be tagged (two different partitions)
|
|
|
|
|
String rowKey1 = UUID.randomUUID().toString();
|
|
|
|
|
String rowKey2 = UUID.randomUUID().toString();
|
|
|
|
|
String rowKey3 = UUID.randomUUID().toString();
|
|
|
|
|
String rowKey1 = genRandomUUID();
|
|
|
|
|
String rowKey2 = genRandomUUID();
|
|
|
|
|
String rowKey3 = genRandomUUID();
|
|
|
|
|
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
|
|
|
|
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
|
|
|
|
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
|
|
|
|
|
@@ -360,8 +392,9 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));
|
|
|
|
|
|
|
|
|
|
// Also create the metadata and config
|
|
|
|
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
|
|
|
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
|
|
|
|
|
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
|
|
|
|
|
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
|
|
|
|
|
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
|
|
|
|
|
|
|
|
|
|
// Let's tag
|
|
|
|
|
@@ -378,7 +411,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
final String partition2 = "2015/01/31";
|
|
|
|
|
|
|
|
|
|
// We create three parquet file, each having one record. (two different partitions)
|
|
|
|
|
final String fileId1 = UUID.randomUUID().toString();
|
|
|
|
|
final String fileId1 = genRandomUUID();
|
|
|
|
|
final String commit1 = "0000001";
|
|
|
|
|
Path baseFilePath = testTable.forCommit(commit1).withInserts(partition1, fileId1, Collections.singletonList(record1));
|
|
|
|
|
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
|
|
|
|
|
@@ -387,7 +420,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(partition1),
|
|
|
|
|
partitionToFilesNameLengthMap, false, false);
|
|
|
|
|
|
|
|
|
|
final String fileId2 = UUID.randomUUID().toString();
|
|
|
|
|
final String fileId2 = genRandomUUID();
|
|
|
|
|
final String commit2 = "0000002";
|
|
|
|
|
baseFilePath = testTable.forCommit(commit2).withInserts(partition1, fileId2, Collections.singletonList(record2));
|
|
|
|
|
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
|
|
|
|
|
@@ -397,7 +430,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(partition1),
|
|
|
|
|
partitionToFilesNameLengthMap, false, false);
|
|
|
|
|
|
|
|
|
|
final String fileId3 = UUID.randomUUID().toString();
|
|
|
|
|
final String fileId3 = genRandomUUID();
|
|
|
|
|
final String commit3 = "0000003";
|
|
|
|
|
baseFilePath = testTable.forCommit(commit3).withInserts(partition2, fileId3, Collections.singletonList(record4));
|
|
|
|
|
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
|
|
|
|
|
@@ -408,6 +441,7 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
partitionToFilesNameLengthMap, false, false);
|
|
|
|
|
|
|
|
|
|
// We do the tag again
|
|
|
|
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
|
|
|
|
taggedRecordRDD = tagLocation(bloomIndex, recordRDD, HoodieSparkTable.create(config, context, metaClient));
|
|
|
|
|
|
|
|
|
|
// Check results
|
|
|
|
|
@@ -428,7 +462,99 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
|
|
|
|
@MethodSource("configParams")
|
|
|
|
|
public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
|
|
|
|
public void testTagLocationOnNonpartitionedTable(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
|
|
|
|
|
boolean useMetadataTable) throws Exception {
|
|
|
|
|
// We have some records to be tagged (two different partitions)
|
|
|
|
|
String rowKey1 = genRandomUUID();
|
|
|
|
|
String rowKey2 = genRandomUUID();
|
|
|
|
|
String rowKey3 = genRandomUUID();
|
|
|
|
|
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
|
|
|
|
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
|
|
|
|
|
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
|
|
|
|
|
|
|
|
|
|
String emptyPartitionPath = "";
|
|
|
|
|
RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1);
|
|
|
|
|
HoodieRecord record1 =
|
|
|
|
|
new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), emptyPartitionPath), rowChange1);
|
|
|
|
|
RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2);
|
|
|
|
|
HoodieRecord record2 =
|
|
|
|
|
new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), emptyPartitionPath), rowChange2);
|
|
|
|
|
RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3);
|
|
|
|
|
HoodieRecord record3 =
|
|
|
|
|
new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), emptyPartitionPath), rowChange3);
|
|
|
|
|
|
|
|
|
|
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3));
|
|
|
|
|
|
|
|
|
|
// Also create the metadata and config
|
|
|
|
|
HoodieWriteConfig config =
|
|
|
|
|
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
|
|
|
|
|
HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
|
|
|
|
|
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
|
|
|
|
|
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
|
|
|
|
|
|
|
|
|
|
// Let's tag
|
|
|
|
|
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
|
|
|
|
|
JavaRDD<HoodieRecord> taggedRecordRDD = tagLocation(bloomIndex, recordRDD, hoodieTable);
|
|
|
|
|
|
|
|
|
|
// Should not find any files
|
|
|
|
|
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
|
|
|
|
assertFalse(record.isCurrentLocationKnown());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
// We create three parquet file, each having one record
|
|
|
|
|
final String fileId1 = genRandomUUID();
|
|
|
|
|
final String commit1 = "0000001";
|
|
|
|
|
Path baseFilePath = testTable.forCommit(commit1).withInserts(emptyPartitionPath, fileId1, Collections.singletonList(record1));
|
|
|
|
|
long baseFileLength = fs.getFileStatus(baseFilePath).getLen();
|
|
|
|
|
partitionToFilesNameLengthMap.computeIfAbsent(emptyPartitionPath,
|
|
|
|
|
k -> new ArrayList<>()).add(Pair.of(fileId1, Integer.valueOf((int) baseFileLength)));
|
|
|
|
|
testTable.doWriteOperation(commit1, WriteOperationType.UPSERT, Collections.singletonList(emptyPartitionPath),
|
|
|
|
|
partitionToFilesNameLengthMap, false, false);
|
|
|
|
|
|
|
|
|
|
final String fileId2 = genRandomUUID();
|
|
|
|
|
final String commit2 = "0000002";
|
|
|
|
|
baseFilePath = testTable.forCommit(commit2).withInserts(emptyPartitionPath, fileId2, Collections.singletonList(record2));
|
|
|
|
|
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
|
|
|
|
|
partitionToFilesNameLengthMap.clear();
|
|
|
|
|
partitionToFilesNameLengthMap.computeIfAbsent(emptyPartitionPath,
|
|
|
|
|
k -> new ArrayList<>()).add(Pair.of(fileId2, Integer.valueOf((int) baseFileLength)));
|
|
|
|
|
testTable.doWriteOperation(commit2, WriteOperationType.UPSERT, Collections.singletonList(emptyPartitionPath),
|
|
|
|
|
partitionToFilesNameLengthMap, false, false);
|
|
|
|
|
|
|
|
|
|
final String fileId3 = UUID.randomUUID().toString();
|
|
|
|
|
final String commit3 = "0000003";
|
|
|
|
|
baseFilePath = testTable.forCommit(commit3).withInserts(emptyPartitionPath, fileId3, Collections.singletonList(record3));
|
|
|
|
|
baseFileLength = fs.getFileStatus(baseFilePath).getLen();
|
|
|
|
|
partitionToFilesNameLengthMap.clear();
|
|
|
|
|
partitionToFilesNameLengthMap.computeIfAbsent(emptyPartitionPath,
|
|
|
|
|
k -> new ArrayList<>()).add(Pair.of(fileId3, Integer.valueOf((int) baseFileLength)));
|
|
|
|
|
testTable.doWriteOperation(commit3, WriteOperationType.UPSERT, Collections.singletonList(emptyPartitionPath),
|
|
|
|
|
partitionToFilesNameLengthMap, false, false);
|
|
|
|
|
|
|
|
|
|
// We do the tag again
|
|
|
|
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
|
|
|
|
taggedRecordRDD = tagLocation(bloomIndex, recordRDD, HoodieSparkTable.create(config, context, metaClient));
|
|
|
|
|
|
|
|
|
|
// Check results
|
|
|
|
|
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
|
|
|
|
if (record.getRecordKey().equals(rowKey1)) {
|
|
|
|
|
assertEquals(record.getCurrentLocation().getFileId(), fileId1);
|
|
|
|
|
} else if (record.getRecordKey().equals(rowKey2)) {
|
|
|
|
|
assertEquals(record.getCurrentLocation().getFileId(), fileId2);
|
|
|
|
|
} else if (record.getRecordKey().equals(rowKey3)) {
|
|
|
|
|
assertEquals(record.getCurrentLocation().getFileId(), fileId3);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
|
|
|
|
@MethodSource("configParams")
|
|
|
|
|
public void testCheckExists(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
|
|
|
|
|
boolean useMetadataTable) throws Exception {
|
|
|
|
|
// We have some records to be tagged (two different partitions)
|
|
|
|
|
|
|
|
|
|
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
|
|
|
|
@@ -454,8 +580,10 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
JavaRDD<HoodieKey> keysRDD = jsc.parallelize(Arrays.asList(key1, key2, key3, key4));
|
|
|
|
|
|
|
|
|
|
// Also create the metadata and config
|
|
|
|
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
|
|
|
|
HoodieWriteConfig config =
|
|
|
|
|
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
|
|
|
|
|
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
|
|
|
|
|
metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
|
|
|
|
|
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter);
|
|
|
|
|
|
|
|
|
|
// Let's tag
|
|
|
|
|
@@ -475,9 +603,9 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
final String partition1 = "2016/01/31";
|
|
|
|
|
final String partition2 = "2015/01/31";
|
|
|
|
|
final String fileId1 = UUID.randomUUID().toString();
|
|
|
|
|
final String fileId2 = UUID.randomUUID().toString();
|
|
|
|
|
final String fileId3 = UUID.randomUUID().toString();
|
|
|
|
|
final String fileId1 = genRandomUUID();
|
|
|
|
|
final String fileId2 = genRandomUUID();
|
|
|
|
|
final String fileId3 = genRandomUUID();
|
|
|
|
|
final Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap = new HashMap<>();
|
|
|
|
|
// We create three parquet file, each having one record. (two different partitions)
|
|
|
|
|
final String commit1 = "0000001";
|
|
|
|
|
@@ -536,7 +664,9 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
|
|
|
|
|
@MethodSource("configParams")
|
|
|
|
|
public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
|
|
|
|
|
public void testBloomFilterFalseError(
|
|
|
|
|
boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking,
|
|
|
|
|
boolean useMetadataTable) throws Exception {
|
|
|
|
|
// We have two hoodie records
|
|
|
|
|
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
|
|
|
|
+ "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
|
|
|
|
|
@@ -561,7 +691,8 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
|
|
|
|
|
// We do the tag
|
|
|
|
|
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
|
|
|
|
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
|
|
|
|
HoodieWriteConfig config =
|
|
|
|
|
makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable);
|
|
|
|
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
|
|
|
|
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
|
|
|
|
|
|
|
|
|
|
@@ -577,4 +708,8 @@ public class TestHoodieBloomIndex extends TestHoodieMetadataBase {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static String genRandomUUID() {
|
|
|
|
|
return genPseudoRandomUUID(RANDOM).toString();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|