From a97814462d1b7b2b7a1a880df1e30bc5a8c3544b Mon Sep 17 00:00:00 2001 From: Sunil Ramaiah Date: Thu, 17 May 2018 15:40:47 -0700 Subject: [PATCH] Added a filter function to filter the record keys in a parquet file --- .../bloom/HoodieBloomIndexCheckFunction.java | 14 ++-- .../uber/hoodie/common/util/ParquetUtils.java | 42 +++++++++++- .../hoodie/common/util/TestParquetUtils.java | 64 ++++++++++++++----- 3 files changed, 93 insertions(+), 27 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index 074ec56da..a52582455 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -26,6 +26,7 @@ import com.uber.hoodie.exception.HoodieIndexException; import com.uber.hoodie.func.LazyIterableIterator; import com.uber.hoodie.table.HoodieTable; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -64,16 +65,9 @@ public class HoodieBloomIndexCheckFunction implements try { // Load all rowKeys from the file, to double-confirm if (!candidateRecordKeys.isEmpty()) { - Set fileRowKeys = ParquetUtils.readRowKeysFromParquet(configuration, filePath); - logger.info("Loading " + fileRowKeys.size() + " row keys from " + filePath); - if (logger.isDebugEnabled()) { - logger.debug("Keys from " + filePath + " => " + fileRowKeys); - } - for (String rowKey : candidateRecordKeys) { - if (fileRowKeys.contains(rowKey)) { - foundRecordKeys.add(rowKey); - } - } + Set fileRowKeys = ParquetUtils.filterParquetRowKeys(configuration, filePath, + new HashSet<>(candidateRecordKeys)); + foundRecordKeys.addAll(fileRowKeys); logger.info("After checking with row keys, we have " + foundRecordKeys.size() + " results, for file " + filePath + " => " + foundRecordKeys); if (logger.isDebugEnabled()) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java index f4215c804..636bec583 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java @@ -29,9 +29,12 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; @@ -52,8 +55,26 @@ public class ParquetUtils { * * @param filePath The parquet file path. * @param configuration configuration to build fs object + * @return Set Set of row keys */ public static Set readRowKeysFromParquet(Configuration configuration, Path filePath) { + return filterParquetRowKeys(configuration, filePath, new HashSet<>()); + } + + /** + * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, + * then this will return all the rowkeys. + * + * @param filePath The parquet file path. + * @param configuration configuration to build fs object + * @param filter record keys filter + * @return Set Set of row keys matching candidateRecordKeys + */ + public static Set filterParquetRowKeys(Configuration configuration, Path filePath, Set filter) { + Optional filterFunction = Optional.empty(); + if (CollectionUtils.isNotEmpty(filter)) { + filterFunction = Optional.of(new RecordKeysFilterFunction(filter)); + } Configuration conf = new Configuration(configuration); conf.addResource(getFs(filePath.toString(), conf).getConf()); Schema readSchema = HoodieAvroUtils.getRecordKeySchema(); @@ -66,7 +87,10 @@ public class ParquetUtils { Object obj = reader.read(); while (obj != null) { if (obj instanceof GenericRecord) { - rowKeys.add(((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()); + String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + if (!filterFunction.isPresent() || filterFunction.get().apply(recordKey)) { + rowKeys.add(recordKey); + } } obj = reader.read(); } @@ -178,4 +202,20 @@ public class ParquetUtils { } return records; } + + static class RecordKeysFilterFunction implements Function { + private final Set candidateKeys; + + RecordKeysFilterFunction(Set candidateKeys) { + this.candidateKeys = candidateKeys; + } + + @Override + public Boolean apply(String recordKey) { + if (candidateKeys.contains(recordKey)) { + return true; + } + return false; + } + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java index 636df99e3..3d5949b6a 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestParquetUtils.java @@ -26,7 +26,9 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -54,28 +56,13 @@ public class TestParquetUtils { @Test public void testHoodieWriteSupport() throws Exception { - List rowKeys = new ArrayList<>(); for (int i = 0; i < 1000; i++) { rowKeys.add(UUID.randomUUID().toString()); } - // Write out a parquet file - Schema schema = HoodieAvroUtils.getRecordKeySchema(); - BloomFilter filter = new BloomFilter(1000, 0.0001); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, - filter); - String filePath = basePath + "/test.parquet"; - ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, - 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE); - for (String rowKey : rowKeys) { - GenericRecord rec = new GenericData.Record(schema); - rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey); - writer.write(rec); - filter.add(rowKey); - } - writer.close(); + writeParquetFile(filePath, rowKeys); // Read and verify List rowKeysInFile = new ArrayList<>( @@ -90,4 +77,49 @@ public class TestParquetUtils { assertTrue("key should be found in bloom filter", filterInFile.mightContain(rowKey)); } } + + @Test + public void testFilterParquetRowKeys() throws Exception { + List rowKeys = new ArrayList<>(); + Set filter = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + String rowKey = UUID.randomUUID().toString(); + rowKeys.add(rowKey); + if (i % 100 == 0) { + filter.add(rowKey); + } + } + + String filePath = basePath + "/test.parquet"; + writeParquetFile(filePath, rowKeys); + + // Read and verify + Set filtered = ParquetUtils.filterParquetRowKeys(HoodieTestUtils.getDefaultHadoopConf(), + new Path(filePath), + filter); + + assertEquals("Filtered count does not match", filter.size(), filtered.size()); + + for (String rowKey : filtered) { + assertTrue("filtered key must be in the given filter", filter.contains(rowKey)); + } + } + + private void writeParquetFile(String filePath, + List rowKeys) throws Exception { + // Write out a parquet file + Schema schema = HoodieAvroUtils.getRecordKeySchema(); + BloomFilter filter = new BloomFilter(1000, 0.0001); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, + filter); + ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, + 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE); + for (String rowKey : rowKeys) { + GenericRecord rec = new GenericData.Record(schema); + rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey); + writer.write(rec); + filter.add(rowKey); + } + writer.close(); + } }