feature(HoodieGlobalBloomIndex): adds a new type of bloom index to allow global record key lookup
This commit is contained in:
committed by
vinoth chandar
parent
7ba842c0fe
commit
98fd97b65f
@@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieIndexException;
|
||||
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
|
||||
import com.uber.hoodie.index.bloom.HoodieGlobalBloomIndex;
|
||||
import com.uber.hoodie.index.bucketed.BucketedIndex;
|
||||
import com.uber.hoodie.index.hbase.HBaseIndex;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
@@ -53,6 +54,8 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
||||
return new InMemoryHashIndex<>(config);
|
||||
case BLOOM:
|
||||
return new HoodieBloomIndex<>(config);
|
||||
case GLOBAL_BLOOM:
|
||||
return new HoodieGlobalBloomIndex<>(config);
|
||||
case BUCKETED:
|
||||
return new BucketedIndex<>(config);
|
||||
default:
|
||||
@@ -116,6 +119,6 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
||||
|
||||
|
||||
public enum IndexType {
|
||||
HBASE, INMEMORY, BLOOM, BUCKETED
|
||||
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, BUCKETED
|
||||
}
|
||||
}
|
||||
|
||||
@@ -303,7 +303,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
||||
* if we dont have key ranges, then also we need to compare against the file. no other choice if
|
||||
* we do, then only compare the file if the record key falls in range.
|
||||
*/
|
||||
private boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) {
|
||||
boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) {
|
||||
return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey);
|
||||
}
|
||||
|
||||
@@ -313,10 +313,11 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
||||
* record's key needs to be checked. For datasets, where the keys have a definite insert order
|
||||
* (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range
|
||||
* pruning.
|
||||
*
|
||||
* Sub-partition to ensure the records can be looked up against files & also prune
|
||||
* file<=>record comparisons based on recordKey
|
||||
* ranges in the index info.
|
||||
*/
|
||||
// sub-partition to ensure the records can be looked up against files & also prune
|
||||
// file<=>record comparisons based on recordKey
|
||||
// ranges in the index info.
|
||||
@VisibleForTesting
|
||||
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
/*
|
||||
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.index.bloom;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* This filter will only work with hoodie dataset since it will only load partitions
|
||||
* with .hoodie_partition_metadata file in it.
|
||||
*/
|
||||
public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends HoodieBloomIndex<T> {
|
||||
|
||||
public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
|
||||
super(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load all involved files as <Partition, filename> pair RDD from all partitions in the table.
|
||||
*/
|
||||
@Override
|
||||
@VisibleForTesting
|
||||
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
|
||||
final HoodieTable hoodieTable) {
|
||||
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||
try {
|
||||
List<String> allPartitionPaths = FSUtils
|
||||
.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
|
||||
config.shouldAssumeDatePartitioning());
|
||||
return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to load all partitions", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For each incoming record, produce N output records, 1 each for each file against which the
|
||||
* record's key needs to be checked. For datasets, where the keys have a definite insert order
|
||||
* (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range
|
||||
* pruning.
|
||||
*
|
||||
* Sub-partition to ensure the records can be looked up against files & also prune
|
||||
* file<=>record comparisons based on recordKey
|
||||
* ranges in the index info.
|
||||
* the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will be ignored
|
||||
* since the search scope should be bigger than that
|
||||
*/
|
||||
@Override
|
||||
@VisibleForTesting
|
||||
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
||||
List<Tuple2<String, BloomIndexFileInfo>> indexInfos =
|
||||
partitionToFileIndexInfo.entrySet().stream()
|
||||
.flatMap(e1 -> e1.getValue().stream()
|
||||
.map(e2 -> new Tuple2<>(e1.getKey(), e2)))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
|
||||
String recordKey = partitionRecordKeyPair._2();
|
||||
|
||||
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
|
||||
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
|
||||
// for each candidate file in partition, that needs to be compared.
|
||||
for (Tuple2<String, BloomIndexFileInfo> indexInfo : indexInfos) {
|
||||
if (shouldCompareWithFile(indexInfo._2(), recordKey)) {
|
||||
recordComparisons.add(
|
||||
new Tuple2<>(String.format("%s#%s", indexInfo._2().getFileName(), recordKey),
|
||||
new Tuple2<>(indexInfo._2().getFileName(),
|
||||
new HoodieKey(recordKey, indexInfo._1()))));
|
||||
}
|
||||
}
|
||||
}
|
||||
return recordComparisons;
|
||||
}).flatMapToPair(t -> t.iterator());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -195,7 +195,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
"Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId);
|
||||
} else {
|
||||
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
|
||||
ParquetReader<IndexedRecord> reader = AvroParquetReader.builder(upsertHandle.getOldFilePath())
|
||||
ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath())
|
||||
.withConf(getHadoopConf()).build();
|
||||
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
|
||||
try {
|
||||
|
||||
@@ -18,29 +18,36 @@ package com.uber.hoodie.common;
|
||||
|
||||
import com.uber.hoodie.HoodieReadClient;
|
||||
import com.uber.hoodie.WriteStatus;
|
||||
import com.uber.hoodie.avro.HoodieAvroWriteSupport;
|
||||
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||
import com.uber.hoodie.exception.HoodieException;
|
||||
import com.uber.hoodie.io.storage.HoodieParquetConfig;
|
||||
import com.uber.hoodie.io.storage.HoodieParquetWriter;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
@@ -174,4 +181,59 @@ public class HoodieClientTestUtils {
|
||||
throw new HoodieException("Error reading hoodie dataset as a dataframe", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static String writeParquetFile(String basePath,
|
||||
String partitionPath,
|
||||
String filename,
|
||||
List<HoodieRecord> records,
|
||||
Schema schema,
|
||||
BloomFilter filter,
|
||||
boolean createCommitTime) throws IOException {
|
||||
|
||||
if (filter == null) {
|
||||
filter = new BloomFilter(10000, 0.0000001);
|
||||
}
|
||||
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema,
|
||||
filter);
|
||||
String commitTime = FSUtils.getCommitTime(filename);
|
||||
HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
|
||||
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
|
||||
HoodieTestUtils.getDefaultHadoopConf(),
|
||||
Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
|
||||
HoodieParquetWriter writer = new HoodieParquetWriter(
|
||||
commitTime,
|
||||
new Path(basePath + "/" + partitionPath + "/" + filename),
|
||||
config,
|
||||
schema);
|
||||
int seqId = 1;
|
||||
for (HoodieRecord record : records) {
|
||||
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
|
||||
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++);
|
||||
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename);
|
||||
writer.writeAvro(record.getRecordKey(), avroRecord);
|
||||
filter.add(record.getRecordKey());
|
||||
}
|
||||
writer.close();
|
||||
|
||||
if (createCommitTime) {
|
||||
HoodieTestUtils.createMetadataFolder(basePath);
|
||||
HoodieTestUtils.createCommitFiles(basePath, commitTime);
|
||||
}
|
||||
return filename;
|
||||
}
|
||||
|
||||
public static String writeParquetFile(String basePath,
|
||||
String partitionPath,
|
||||
List<HoodieRecord> records,
|
||||
Schema schema,
|
||||
BloomFilter filter,
|
||||
boolean createCommitTime) throws IOException, InterruptedException {
|
||||
Thread.sleep(1000);
|
||||
String commitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
String fileId = UUID.randomUUID().toString();
|
||||
String filename = FSUtils.makeDataFileName(commitTime, 1, fileId);
|
||||
HoodieTestUtils.createCommitFiles(basePath, commitTime);
|
||||
return HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, partitionPath, filename, records, schema, filter, createCommitTime);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.uber.hoodie.avro.HoodieAvroWriteSupport;
|
||||
import com.uber.hoodie.common.BloomFilter;
|
||||
import com.uber.hoodie.common.HoodieClientTestUtils;
|
||||
import com.uber.hoodie.common.TestRawTripPayload;
|
||||
@@ -37,29 +36,19 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.io.storage.HoodieParquetConfig;
|
||||
import com.uber.hoodie.io.storage.HoodieParquetWriter;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
@@ -173,10 +162,18 @@ public class TestHoodieBloomIndex {
|
||||
HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()),
|
||||
rowChange4);
|
||||
|
||||
writeParquetFile("2016/04/01", "2_0_20160401010101.parquet", Lists.newArrayList(), schema, null, false);
|
||||
writeParquetFile("2015/03/12", "1_0_20150312101010.parquet", Lists.newArrayList(), schema, null, false);
|
||||
writeParquetFile("2015/03/12", "3_0_20150312101010.parquet", Arrays.asList(record1), schema, null, false);
|
||||
writeParquetFile("2015/03/12", "4_0_20150312101010.parquet", Arrays.asList(record2, record3, record4), schema, null,
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet",
|
||||
Lists.newArrayList(), schema, null, false);
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet",
|
||||
Lists.newArrayList(), schema, null, false);
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet",
|
||||
Arrays.asList(record1), schema, null, false);
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet",
|
||||
Arrays.asList(record2, record3, record4), schema, null,
|
||||
false);
|
||||
|
||||
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
|
||||
@@ -270,7 +267,9 @@ public class TestHoodieBloomIndex {
|
||||
// record2, record3).
|
||||
BloomFilter filter = new BloomFilter(10000, 0.0000001);
|
||||
filter.add(record3.getRecordKey());
|
||||
String filename = writeParquetFile("2016/01/31", Arrays.asList(record1, record2), schema, filter, true);
|
||||
String filename = HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2016/01/31",
|
||||
Arrays.asList(record1, record2), schema, filter, true);
|
||||
|
||||
// The bloom filter contains 3 records
|
||||
assertTrue(filter.mightContain(record1.getRecordKey()));
|
||||
@@ -355,9 +354,12 @@ public class TestHoodieBloomIndex {
|
||||
}
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
String filename1 = writeParquetFile("2016/01/31", Arrays.asList(record1), schema, null, true);
|
||||
String filename2 = writeParquetFile("2016/01/31", Arrays.asList(record2), schema, null, true);
|
||||
String filename3 = writeParquetFile("2015/01/31", Arrays.asList(record4), schema, null, true);
|
||||
String filename1 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true);
|
||||
String filename2 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true);
|
||||
String filename3 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
|
||||
|
||||
// We do the tag again
|
||||
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
@@ -420,9 +422,12 @@ public class TestHoodieBloomIndex {
|
||||
}
|
||||
|
||||
// We create three parquet file, each having one record. (two different partitions)
|
||||
String filename1 = writeParquetFile("2016/01/31", Arrays.asList(record1), schema, null, true);
|
||||
String filename2 = writeParquetFile("2016/01/31", Arrays.asList(record2), schema, null, true);
|
||||
String filename3 = writeParquetFile("2015/01/31", Arrays.asList(record4), schema, null, true);
|
||||
String filename1 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true);
|
||||
String filename2 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true);
|
||||
String filename3 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
|
||||
|
||||
// We do the tag again
|
||||
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
@@ -468,7 +473,9 @@ public class TestHoodieBloomIndex {
|
||||
|
||||
BloomFilter filter = new BloomFilter(10000, 0.0000001);
|
||||
filter.add(record2.getRecordKey());
|
||||
String filename = writeParquetFile("2016/01/31", Arrays.asList(record1), schema, filter, true);
|
||||
String filename = HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2016/01/31",
|
||||
Arrays.asList(record1), schema, filter, true);
|
||||
assertTrue(filter.mightContain(record1.getRecordKey()));
|
||||
assertTrue(filter.mightContain(record2.getRecordKey()));
|
||||
|
||||
@@ -491,49 +498,4 @@ public class TestHoodieBloomIndex {
|
||||
}
|
||||
}
|
||||
|
||||
private String writeParquetFile(String partitionPath, List<HoodieRecord> records, Schema schema, BloomFilter filter,
|
||||
boolean createCommitTime) throws IOException, InterruptedException {
|
||||
Thread.sleep(1000);
|
||||
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
||||
String fileId = UUID.randomUUID().toString();
|
||||
String filename = FSUtils.makeDataFileName(commitTime, 1, fileId);
|
||||
|
||||
return writeParquetFile(partitionPath, filename, records, schema, filter, createCommitTime);
|
||||
}
|
||||
|
||||
private String writeParquetFile(String partitionPath, String filename, List<HoodieRecord> records, Schema schema,
|
||||
BloomFilter filter, boolean createCommitTime) throws IOException {
|
||||
|
||||
if (filter == null) {
|
||||
filter = new BloomFilter(10000, 0.0000001);
|
||||
}
|
||||
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema,
|
||||
filter);
|
||||
String commitTime = FSUtils.getCommitTime(filename);
|
||||
HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
|
||||
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
|
||||
HoodieTestUtils.getDefaultHadoopConf(),
|
||||
Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO));
|
||||
HoodieParquetWriter writer = new HoodieParquetWriter(
|
||||
commitTime,
|
||||
new Path(basePath + "/" + partitionPath + "/" + filename),
|
||||
config,
|
||||
schema);
|
||||
int seqId = 1;
|
||||
for (HoodieRecord record : records) {
|
||||
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
|
||||
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++);
|
||||
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename);
|
||||
writer.writeAvro(record.getRecordKey(), avroRecord);
|
||||
filter.add(record.getRecordKey());
|
||||
}
|
||||
writer.close();
|
||||
|
||||
if (createCommitTime) {
|
||||
// Also make sure the commit is valid
|
||||
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
|
||||
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + ".commit").createNewFile();
|
||||
}
|
||||
return filename;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,308 @@
|
||||
/*
|
||||
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.index.bloom;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.uber.hoodie.common.HoodieClientTestUtils;
|
||||
import com.uber.hoodie.common.TestRawTripPayload;
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class TestHoodieGlobalBloomIndex {
|
||||
|
||||
private JavaSparkContext jsc = null;
|
||||
private String basePath = null;
|
||||
private transient FileSystem fs;
|
||||
private String schemaStr;
|
||||
private Schema schema;
|
||||
|
||||
public TestHoodieGlobalBloomIndex() throws Exception {
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() throws IOException {
|
||||
// Initialize a local spark env
|
||||
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieGlobalBloomIndex"));
|
||||
// Create a temp folder as the base path
|
||||
TemporaryFolder folder = new TemporaryFolder();
|
||||
folder.create();
|
||||
basePath = folder.getRoot().getAbsolutePath();
|
||||
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
|
||||
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
|
||||
// We have some records to be tagged (two different partitions)
|
||||
schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
|
||||
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
||||
}
|
||||
|
||||
@After
|
||||
public void clean() {
|
||||
if (basePath != null) {
|
||||
new File(basePath).delete();
|
||||
}
|
||||
if (jsc != null) {
|
||||
jsc.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadInvolvedFiles() throws IOException {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
|
||||
|
||||
// Create some partitions, and put some files, along with the meta file
|
||||
// "2016/01/21": 0 file
|
||||
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
|
||||
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet,
|
||||
// 4_0_20150312101010.parquet)
|
||||
new File(basePath + "/2016/01/21").mkdirs();
|
||||
new File(basePath + "/2016/01/21/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
|
||||
new File(basePath + "/2016/04/01").mkdirs();
|
||||
new File(basePath + "/2016/04/01/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
|
||||
new File(basePath + "/2015/03/12").mkdirs();
|
||||
new File(basePath + "/2015/03/12/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
|
||||
|
||||
TestRawTripPayload rowChange1 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
|
||||
rowChange1);
|
||||
TestRawTripPayload rowChange2 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()),
|
||||
rowChange2);
|
||||
TestRawTripPayload rowChange3 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()),
|
||||
rowChange3);
|
||||
TestRawTripPayload rowChange4 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()),
|
||||
rowChange4);
|
||||
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet",
|
||||
Lists.newArrayList(), schema, null, false);
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet",
|
||||
Lists.newArrayList(), schema, null, false);
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet",
|
||||
Arrays.asList(record1), schema, null, false);
|
||||
HoodieClientTestUtils
|
||||
.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet",
|
||||
Arrays.asList(record2, record3, record4), schema, null, false);
|
||||
|
||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
// partitions will NOT be respected by this loadInvolvedFiles(...) call
|
||||
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||
// Still 0, as no valid commit
|
||||
assertEquals(filesList.size(), 0);
|
||||
|
||||
// Add some commits
|
||||
new File(basePath + "/.hoodie").mkdirs();
|
||||
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
|
||||
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
|
||||
|
||||
table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||
assertEquals(filesList.size(), 4);
|
||||
|
||||
Map<String, BloomIndexFileInfo> filesMap = toFileMap(filesList);
|
||||
// key ranges checks
|
||||
assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMaxRecordKey());
|
||||
assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMinRecordKey());
|
||||
assertFalse(filesMap.get("2015/03/12/1_0_20150312101010.parquet").hasKeyRanges());
|
||||
assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMaxRecordKey());
|
||||
assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMinRecordKey());
|
||||
assertTrue(filesMap.get("2015/03/12/3_0_20150312101010.parquet").hasKeyRanges());
|
||||
|
||||
Map<String, BloomIndexFileInfo> expected = new HashMap<>();
|
||||
expected.put("2016/04/01/2_0_20160401010101.parquet", new BloomIndexFileInfo("2_0_20160401010101.parquet"));
|
||||
expected.put("2015/03/12/1_0_20150312101010.parquet", new BloomIndexFileInfo("1_0_20150312101010.parquet"));
|
||||
expected.put("2015/03/12/3_0_20150312101010.parquet",
|
||||
new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000"));
|
||||
expected.put("2015/03/12/4_0_20150312101010.parquet",
|
||||
new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003"));
|
||||
|
||||
assertEquals(expected, filesMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplodeRecordRDDWithFileComparisons() {
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
|
||||
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new HashMap<>();
|
||||
partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(new BloomIndexFileInfo("f1"),
|
||||
new BloomIndexFileInfo("f2", "000", "000"), new BloomIndexFileInfo("f3", "001", "003")));
|
||||
|
||||
partitionToFileIndexInfo.put("2017/10/23", Arrays.asList(
|
||||
new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010")));
|
||||
|
||||
// the partition partition of the key of the incoming records will be ignored
|
||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList(
|
||||
new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
|
||||
new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);
|
||||
|
||||
List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
|
||||
partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
|
||||
|
||||
/* epecting:
|
||||
f4#003, f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
|
||||
f1#003, f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
|
||||
f3#003, f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
|
||||
f4#002, f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
|
||||
f1#002, f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
|
||||
f3#002, f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
|
||||
f4#005, f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
|
||||
f1#005, f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
|
||||
f4#004, f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
|
||||
f1#004, f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
|
||||
*/
|
||||
assertEquals(10, comparisonKeyList.size());
|
||||
|
||||
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
|
||||
t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
|
||||
|
||||
assertEquals(4, recordKeyToFileComps.size());
|
||||
assertEquals(Arrays.asList("f4", "f1", "f3"), recordKeyToFileComps.get("002"));
|
||||
assertEquals(Arrays.asList("f4", "f1", "f3"), recordKeyToFileComps.get("003"));
|
||||
assertEquals(Arrays.asList("f4", "f1"), recordKeyToFileComps.get("004"));
|
||||
assertEquals(Arrays.asList("f4", "f1"), recordKeyToFileComps.get("005"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTagLocation() throws Exception {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config);
|
||||
|
||||
// Create some partitions, and put some files, along with the meta file
|
||||
// "2016/01/21": 0 file
|
||||
// "2016/04/01": 1 file (2_0_20160401010101.parquet)
|
||||
// "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet,
|
||||
// 4_0_20150312101010.parquet)
|
||||
new File(basePath + "/2016/01/21").mkdirs();
|
||||
new File(basePath + "/2016/01/21/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
|
||||
new File(basePath + "/2016/04/01").mkdirs();
|
||||
new File(basePath + "/2016/04/01/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
|
||||
new File(basePath + "/2015/03/12").mkdirs();
|
||||
new File(basePath + "/2015/03/12/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
|
||||
|
||||
TestRawTripPayload rowChange1 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
|
||||
rowChange1);
|
||||
TestRawTripPayload rowChange2 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()),
|
||||
rowChange2);
|
||||
TestRawTripPayload rowChange3 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()),
|
||||
rowChange3);
|
||||
|
||||
// this record will be saved in table and will be tagged to the incoming record5
|
||||
TestRawTripPayload rowChange4 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()),
|
||||
rowChange4);
|
||||
|
||||
// this has the same record key as record4 but different time so different partition, but globalbloomIndex should
|
||||
// tag the original partition of the saved record4
|
||||
TestRawTripPayload rowChange5 = new TestRawTripPayload(
|
||||
"{\"_row_key\":\"003\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
|
||||
HoodieRecord record5 = new HoodieRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()),
|
||||
rowChange4);
|
||||
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
|
||||
|
||||
String filename0 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Arrays.asList(record1), schema, null, false);
|
||||
String filename1 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(), schema, null, false);
|
||||
String filename2 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record2), schema, null, false);
|
||||
String filename3 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false);
|
||||
|
||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
|
||||
|
||||
// Add some commits
|
||||
new File(basePath + "/.hoodie").mkdirs();
|
||||
|
||||
// partitions will NOT be respected by this loadInvolvedFiles(...) call
|
||||
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
|
||||
|
||||
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
||||
if (record.getRecordKey().equals("000")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0)));
|
||||
} else if (record.getRecordKey().equals("001")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
|
||||
} else if (record.getRecordKey().equals("002")) {
|
||||
assertTrue(!record.isCurrentLocationKnown());
|
||||
} else if (record.getRecordKey().equals("004")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// convert list to map to avoid sorting order dependencies
|
||||
private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, BloomIndexFileInfo>> filesList) {
|
||||
Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();
|
||||
for (Tuple2<String, BloomIndexFileInfo> t : filesList) {
|
||||
filesMap.put(t._1() + "/" + t._2().getFileName(), t._2());
|
||||
}
|
||||
return filesMap;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -126,6 +126,10 @@ public class HoodieTestUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static final void createMetadataFolder(String basePath) throws IOException {
|
||||
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
|
||||
}
|
||||
|
||||
public static final void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException {
|
||||
for (String commitTime : commitTimes) {
|
||||
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeInflightCommitFileName(
|
||||
|
||||
Reference in New Issue
Block a user