diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java index bc7dc7eb4..17897846d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIndexException; import com.uber.hoodie.index.bloom.HoodieBloomIndex; +import com.uber.hoodie.index.bloom.HoodieGlobalBloomIndex; import com.uber.hoodie.index.bucketed.BucketedIndex; import com.uber.hoodie.index.hbase.HBaseIndex; import com.uber.hoodie.table.HoodieTable; @@ -53,6 +54,8 @@ public abstract class HoodieIndex implements Seri return new InMemoryHashIndex<>(config); case BLOOM: return new HoodieBloomIndex<>(config); + case GLOBAL_BLOOM: + return new HoodieGlobalBloomIndex<>(config); case BUCKETED: return new BucketedIndex<>(config); default: @@ -116,6 +119,6 @@ public abstract class HoodieIndex implements Seri public enum IndexType { - HBASE, INMEMORY, BLOOM, BUCKETED + HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, BUCKETED } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index b0f37e8ac..050958159 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -303,7 +303,7 @@ public class HoodieBloomIndex extends HoodieIndex * if we dont have key ranges, then also we need to compare against the file. no other choice if * we do, then only compare the file if the record key falls in range. */ - private boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) { + boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) { return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey); } @@ -313,10 +313,11 @@ public class HoodieBloomIndex extends HoodieIndex * record's key needs to be checked. For datasets, where the keys have a definite insert order * (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range * pruning. + * + * Sub-partition to ensure the records can be looked up against files & also prune + * file<=>record comparisons based on recordKey + * ranges in the index info. */ - // sub-partition to ensure the records can be looked up against files & also prune - // file<=>record comparisons based on recordKey - // ranges in the index info. @VisibleForTesting JavaPairRDD> explodeRecordRDDWithFileComparisons( final Map> partitionToFileIndexInfo, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java new file mode 100644 index 000000000..49c5fd3fc --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import com.google.common.annotations.VisibleForTesting; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.table.HoodieTable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + +/** + * This filter will only work with hoodie dataset since it will only load partitions + * with .hoodie_partition_metadata file in it. + */ +public class HoodieGlobalBloomIndex extends HoodieBloomIndex { + + public HoodieGlobalBloomIndex(HoodieWriteConfig config) { + super(config); + } + + /** + * Load all involved files as pair RDD from all partitions in the table. + */ + @Override + @VisibleForTesting + List> loadInvolvedFiles(List partitions, final JavaSparkContext jsc, + final HoodieTable hoodieTable) { + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + try { + List allPartitionPaths = FSUtils + .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + config.shouldAssumeDatePartitioning()); + return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable); + } catch (IOException e) { + throw new HoodieIOException("Failed to load all partitions", e); + } + } + + /** + * For each incoming record, produce N output records, 1 each for each file against which the + * record's key needs to be checked. For datasets, where the keys have a definite insert order + * (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range + * pruning. + * + * Sub-partition to ensure the records can be looked up against files & also prune + * file<=>record comparisons based on recordKey + * ranges in the index info. + * the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will be ignored + * since the search scope should be bigger than that + */ + @Override + @VisibleForTesting + JavaPairRDD> explodeRecordRDDWithFileComparisons( + final Map> partitionToFileIndexInfo, + JavaPairRDD partitionRecordKeyPairRDD) { + List> indexInfos = + partitionToFileIndexInfo.entrySet().stream() + .flatMap(e1 -> e1.getValue().stream() + .map(e2 -> new Tuple2<>(e1.getKey(), e2))) + .collect(Collectors.toList()); + + return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { + String recordKey = partitionRecordKeyPair._2(); + + List>> recordComparisons = new ArrayList<>(); + if (indexInfos != null) { // could be null, if there are no files in a given partition yet. + // for each candidate file in partition, that needs to be compared. + for (Tuple2 indexInfo : indexInfos) { + if (shouldCompareWithFile(indexInfo._2(), recordKey)) { + recordComparisons.add( + new Tuple2<>(String.format("%s#%s", indexInfo._2().getFileName(), recordKey), + new Tuple2<>(indexInfo._2().getFileName(), + new HoodieKey(recordKey, indexInfo._1())))); + } + } + } + return recordComparisons; + }).flatMapToPair(t -> t.iterator()); + } + +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 81efc83d9..3fa8a175a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -195,7 +195,7 @@ public class HoodieCopyOnWriteTable extends Hoodi "Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId); } else { AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); - ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) + ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) .withConf(getHadoopConf()).build(); BoundedInMemoryExecutor wrapper = null; try { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java index cc1dad8e1..54ea125b1 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java @@ -18,29 +18,36 @@ package com.uber.hoodie.common; import com.uber.hoodie.HoodieReadClient; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.io.storage.HoodieParquetConfig; +import com.uber.hoodie.io.storage.HoodieParquetWriter; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -174,4 +181,59 @@ public class HoodieClientTestUtils { throw new HoodieException("Error reading hoodie dataset as a dataframe", e); } } + + public static String writeParquetFile(String basePath, + String partitionPath, + String filename, + List records, + Schema schema, + BloomFilter filter, + boolean createCommitTime) throws IOException { + + if (filter == null) { + filter = new BloomFilter(10000, 0.0000001); + } + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, + filter); + String commitTime = FSUtils.getCommitTime(filename); + HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, + ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, + HoodieTestUtils.getDefaultHadoopConf(), + Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); + HoodieParquetWriter writer = new HoodieParquetWriter( + commitTime, + new Path(basePath + "/" + partitionPath + "/" + filename), + config, + schema); + int seqId = 1; + for (HoodieRecord record : records) { + GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); + HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++); + HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename); + writer.writeAvro(record.getRecordKey(), avroRecord); + filter.add(record.getRecordKey()); + } + writer.close(); + + if (createCommitTime) { + HoodieTestUtils.createMetadataFolder(basePath); + HoodieTestUtils.createCommitFiles(basePath, commitTime); + } + return filename; + } + + public static String writeParquetFile(String basePath, + String partitionPath, + List records, + Schema schema, + BloomFilter filter, + boolean createCommitTime) throws IOException, InterruptedException { + Thread.sleep(1000); + String commitTime = HoodieTestUtils.makeNewCommitTime(); + String fileId = UUID.randomUUID().toString(); + String filename = FSUtils.makeDataFileName(commitTime, 1, fileId); + HoodieTestUtils.createCommitFiles(basePath, commitTime); + return HoodieClientTestUtils + .writeParquetFile(basePath, partitionPath, filename, records, schema, filter, createCommitTime); + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index 82a4c1a29..5f602d7f7 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -27,7 +27,6 @@ import static org.junit.Assert.fail; import com.google.common.base.Optional; import com.google.common.collect.Lists; -import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.TestRawTripPayload; @@ -37,29 +36,19 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; -import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.io.storage.HoodieParquetConfig; -import com.uber.hoodie.io.storage.HoodieParquetWriter; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.IOException; -import java.text.SimpleDateFormat; import java.util.Arrays; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.stream.Collectors; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -173,10 +162,18 @@ public class TestHoodieBloomIndex { HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); - writeParquetFile("2016/04/01", "2_0_20160401010101.parquet", Lists.newArrayList(), schema, null, false); - writeParquetFile("2015/03/12", "1_0_20150312101010.parquet", Lists.newArrayList(), schema, null, false); - writeParquetFile("2015/03/12", "3_0_20150312101010.parquet", Arrays.asList(record1), schema, null, false); - writeParquetFile("2015/03/12", "4_0_20150312101010.parquet", Arrays.asList(record2, record3, record4), schema, null, + HoodieClientTestUtils + .writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", + Lists.newArrayList(), schema, null, false); + HoodieClientTestUtils + .writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", + Lists.newArrayList(), schema, null, false); + HoodieClientTestUtils + .writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", + Arrays.asList(record1), schema, null, false); + HoodieClientTestUtils + .writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", + Arrays.asList(record2, record3, record4), schema, null, false); List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); @@ -270,7 +267,9 @@ public class TestHoodieBloomIndex { // record2, record3). BloomFilter filter = new BloomFilter(10000, 0.0000001); filter.add(record3.getRecordKey()); - String filename = writeParquetFile("2016/01/31", Arrays.asList(record1, record2), schema, filter, true); + String filename = HoodieClientTestUtils + .writeParquetFile(basePath, "2016/01/31", + Arrays.asList(record1, record2), schema, filter, true); // The bloom filter contains 3 records assertTrue(filter.mightContain(record1.getRecordKey())); @@ -355,9 +354,12 @@ public class TestHoodieBloomIndex { } // We create three parquet file, each having one record. (two different partitions) - String filename1 = writeParquetFile("2016/01/31", Arrays.asList(record1), schema, null, true); - String filename2 = writeParquetFile("2016/01/31", Arrays.asList(record2), schema, null, true); - String filename3 = writeParquetFile("2015/01/31", Arrays.asList(record4), schema, null, true); + String filename1 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true); + String filename2 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true); + String filename3 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); // We do the tag again metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); @@ -420,9 +422,12 @@ public class TestHoodieBloomIndex { } // We create three parquet file, each having one record. (two different partitions) - String filename1 = writeParquetFile("2016/01/31", Arrays.asList(record1), schema, null, true); - String filename2 = writeParquetFile("2016/01/31", Arrays.asList(record2), schema, null, true); - String filename3 = writeParquetFile("2015/01/31", Arrays.asList(record4), schema, null, true); + String filename1 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true); + String filename2 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true); + String filename3 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); // We do the tag again metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); @@ -468,7 +473,9 @@ public class TestHoodieBloomIndex { BloomFilter filter = new BloomFilter(10000, 0.0000001); filter.add(record2.getRecordKey()); - String filename = writeParquetFile("2016/01/31", Arrays.asList(record1), schema, filter, true); + String filename = HoodieClientTestUtils + .writeParquetFile(basePath, "2016/01/31", + Arrays.asList(record1), schema, filter, true); assertTrue(filter.mightContain(record1.getRecordKey())); assertTrue(filter.mightContain(record2.getRecordKey())); @@ -491,49 +498,4 @@ public class TestHoodieBloomIndex { } } - private String writeParquetFile(String partitionPath, List records, Schema schema, BloomFilter filter, - boolean createCommitTime) throws IOException, InterruptedException { - Thread.sleep(1000); - String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); - String fileId = UUID.randomUUID().toString(); - String filename = FSUtils.makeDataFileName(commitTime, 1, fileId); - - return writeParquetFile(partitionPath, filename, records, schema, filter, createCommitTime); - } - - private String writeParquetFile(String partitionPath, String filename, List records, Schema schema, - BloomFilter filter, boolean createCommitTime) throws IOException { - - if (filter == null) { - filter = new BloomFilter(10000, 0.0000001); - } - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, - filter); - String commitTime = FSUtils.getCommitTime(filename); - HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, - ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - HoodieTestUtils.getDefaultHadoopConf(), - Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); - HoodieParquetWriter writer = new HoodieParquetWriter( - commitTime, - new Path(basePath + "/" + partitionPath + "/" + filename), - config, - schema); - int seqId = 1; - for (HoodieRecord record : records) { - GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); - HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++); - HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename); - writer.writeAvro(record.getRecordKey(), avroRecord); - filter.add(record.getRecordKey()); - } - writer.close(); - - if (createCommitTime) { - // Also make sure the commit is valid - new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); - new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + ".commit").createNewFile(); - } - return filename; - } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java new file mode 100644 index 000000000..8f222b37f --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java @@ -0,0 +1,308 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import static org.junit.Assert.*; + +import com.google.common.collect.Lists; +import com.uber.hoodie.common.HoodieClientTestUtils; +import com.uber.hoodie.common.TestRawTripPayload; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.table.HoodieTable; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.avro.Schema; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.Tuple2; + +public class TestHoodieGlobalBloomIndex { + + private JavaSparkContext jsc = null; + private String basePath = null; + private transient FileSystem fs; + private String schemaStr; + private Schema schema; + + public TestHoodieGlobalBloomIndex() throws Exception { + } + + @Before + public void init() throws IOException { + // Initialize a local spark env + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieGlobalBloomIndex")); + // Create a temp folder as the base path + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + basePath = folder.getRoot().getAbsolutePath(); + fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); + HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); + // We have some records to be tagged (two different partitions) + schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); + schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); + } + + @After + public void clean() { + if (basePath != null) { + new File(basePath).delete(); + } + if (jsc != null) { + jsc.stop(); + } + } + + @Test + public void testLoadInvolvedFiles() throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config); + + // Create some partitions, and put some files, along with the meta file + // "2016/01/21": 0 file + // "2016/04/01": 1 file (2_0_20160401010101.parquet) + // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, + // 4_0_20150312101010.parquet) + new File(basePath + "/2016/01/21").mkdirs(); + new File(basePath + "/2016/01/21/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + new File(basePath + "/2016/04/01").mkdirs(); + new File(basePath + "/2016/04/01/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + new File(basePath + "/2015/03/12").mkdirs(); + new File(basePath + "/2015/03/12/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + + TestRawTripPayload rowChange1 = new TestRawTripPayload( + "{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), + rowChange1); + TestRawTripPayload rowChange2 = new TestRawTripPayload( + "{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), + rowChange2); + TestRawTripPayload rowChange3 = new TestRawTripPayload( + "{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), + rowChange3); + TestRawTripPayload rowChange4 = new TestRawTripPayload( + "{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), + rowChange4); + + HoodieClientTestUtils + .writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", + Lists.newArrayList(), schema, null, false); + HoodieClientTestUtils + .writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", + Lists.newArrayList(), schema, null, false); + HoodieClientTestUtils + .writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", + Arrays.asList(record1), schema, null, false); + HoodieClientTestUtils + .writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", + Arrays.asList(record2, record3, record4), schema, null, false); + + // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up + List partitions = Arrays.asList("2016/01/21", "2016/04/01"); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + // partitions will NOT be respected by this loadInvolvedFiles(...) call + List> filesList = index.loadInvolvedFiles(partitions, jsc, table); + // Still 0, as no valid commit + assertEquals(filesList.size(), 0); + + // Add some commits + new File(basePath + "/.hoodie").mkdirs(); + new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); + new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); + + table = HoodieTable.getHoodieTable(metadata, config, jsc); + filesList = index.loadInvolvedFiles(partitions, jsc, table); + assertEquals(filesList.size(), 4); + + Map filesMap = toFileMap(filesList); + // key ranges checks + assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMaxRecordKey()); + assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMinRecordKey()); + assertFalse(filesMap.get("2015/03/12/1_0_20150312101010.parquet").hasKeyRanges()); + assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMaxRecordKey()); + assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMinRecordKey()); + assertTrue(filesMap.get("2015/03/12/3_0_20150312101010.parquet").hasKeyRanges()); + + Map expected = new HashMap<>(); + expected.put("2016/04/01/2_0_20160401010101.parquet", new BloomIndexFileInfo("2_0_20160401010101.parquet")); + expected.put("2015/03/12/1_0_20150312101010.parquet", new BloomIndexFileInfo("1_0_20150312101010.parquet")); + expected.put("2015/03/12/3_0_20150312101010.parquet", + new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")); + expected.put("2015/03/12/4_0_20150312101010.parquet", + new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003")); + + assertEquals(expected, filesMap); + } + + @Test + public void testExplodeRecordRDDWithFileComparisons() { + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config); + + final Map> partitionToFileIndexInfo = new HashMap<>(); + partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(new BloomIndexFileInfo("f1"), + new BloomIndexFileInfo("f2", "000", "000"), new BloomIndexFileInfo("f3", "001", "003"))); + + partitionToFileIndexInfo.put("2017/10/23", Arrays.asList( + new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010"))); + + // the partition partition of the key of the incoming records will be ignored + JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList( + new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), + new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t); + + List>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( + partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); + + /* epecting: + f4#003, f4, HoodieKey { recordKey=003 partitionPath=2017/10/23} + f1#003, f1, HoodieKey { recordKey=003 partitionPath=2017/10/22} + f3#003, f3, HoodieKey { recordKey=003 partitionPath=2017/10/22} + f4#002, f4, HoodieKey { recordKey=002 partitionPath=2017/10/23} + f1#002, f1, HoodieKey { recordKey=002 partitionPath=2017/10/22} + f3#002, f3, HoodieKey { recordKey=002 partitionPath=2017/10/22} + f4#005, f4, HoodieKey { recordKey=005 partitionPath=2017/10/23} + f1#005, f1, HoodieKey { recordKey=005 partitionPath=2017/10/22} + f4#004, f4, HoodieKey { recordKey=004 partitionPath=2017/10/23} + f1#004, f1, HoodieKey { recordKey=004 partitionPath=2017/10/22} + */ + assertEquals(10, comparisonKeyList.size()); + + Map> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy( + t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); + + assertEquals(4, recordKeyToFileComps.size()); + assertEquals(Arrays.asList("f4", "f1", "f3"), recordKeyToFileComps.get("002")); + assertEquals(Arrays.asList("f4", "f1", "f3"), recordKeyToFileComps.get("003")); + assertEquals(Arrays.asList("f4", "f1"), recordKeyToFileComps.get("004")); + assertEquals(Arrays.asList("f4", "f1"), recordKeyToFileComps.get("005")); + } + + + @Test + public void testTagLocation() throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config); + + // Create some partitions, and put some files, along with the meta file + // "2016/01/21": 0 file + // "2016/04/01": 1 file (2_0_20160401010101.parquet) + // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, + // 4_0_20150312101010.parquet) + new File(basePath + "/2016/01/21").mkdirs(); + new File(basePath + "/2016/01/21/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + new File(basePath + "/2016/04/01").mkdirs(); + new File(basePath + "/2016/04/01/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + new File(basePath + "/2015/03/12").mkdirs(); + new File(basePath + "/2015/03/12/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + + TestRawTripPayload rowChange1 = new TestRawTripPayload( + "{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), + rowChange1); + TestRawTripPayload rowChange2 = new TestRawTripPayload( + "{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), + rowChange2); + TestRawTripPayload rowChange3 = new TestRawTripPayload( + "{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), + rowChange3); + + // this record will be saved in table and will be tagged to the incoming record5 + TestRawTripPayload rowChange4 = new TestRawTripPayload( + "{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), + rowChange4); + + // this has the same record key as record4 but different time so different partition, but globalbloomIndex should + // tag the original partition of the saved record4 + TestRawTripPayload rowChange5 = new TestRawTripPayload( + "{\"_row_key\":\"003\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record5 = new HoodieRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), + rowChange4); + + JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); + + String filename0 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Arrays.asList(record1), schema, null, false); + String filename1 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(), schema, null, false); + String filename2 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record2), schema, null, false); + String filename3 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false); + + // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up + HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + + + // Add some commits + new File(basePath + "/.hoodie").mkdirs(); + + // partitions will NOT be respected by this loadInvolvedFiles(...) call + JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, table); + + for (HoodieRecord record : taggedRecordRDD.collect()) { + if (record.getRecordKey().equals("000")) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0))); + } else if (record.getRecordKey().equals("001")) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); + } else if (record.getRecordKey().equals("002")) { + assertTrue(!record.isCurrentLocationKnown()); + } else if (record.getRecordKey().equals("004")) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); + } + } + } + + // convert list to map to avoid sorting order dependencies + private Map toFileMap(List> filesList) { + Map filesMap = new HashMap<>(); + for (Tuple2 t : filesList) { + filesMap.put(t._1() + "/" + t._2().getFileName(), t._2()); + } + return filesMap; + } + +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 62877bd2c..eda4bc77c 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -126,6 +126,10 @@ public class HoodieTestUtils { } } + public static final void createMetadataFolder(String basePath) throws IOException { + new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); + } + public static final void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeInflightCommitFileName(