1
0

Added a filter function to filter the record keys in a parquet file

This commit is contained in:
Sunil Ramaiah
2018-05-17 15:40:47 -07:00
committed by vinoth chandar
parent 23d53763c4
commit a97814462d
3 changed files with 93 additions and 27 deletions

View File

@@ -26,6 +26,7 @@ import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.func.LazyIterableIterator;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -64,16 +65,9 @@ public class HoodieBloomIndexCheckFunction implements
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
Set<String> fileRowKeys = ParquetUtils.readRowKeysFromParquet(configuration, filePath);
logger.info("Loading " + fileRowKeys.size() + " row keys from " + filePath);
if (logger.isDebugEnabled()) {
logger.debug("Keys from " + filePath + " => " + fileRowKeys);
}
for (String rowKey : candidateRecordKeys) {
if (fileRowKeys.contains(rowKey)) {
foundRecordKeys.add(rowKey);
}
}
Set<String> fileRowKeys = ParquetUtils.filterParquetRowKeys(configuration, filePath,
new HashSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
logger.info("After checking with row keys, we have " + foundRecordKeys.size()
+ " results, for file " + filePath + " => " + foundRecordKeys);
if (logger.isDebugEnabled()) {

View File

@@ -29,9 +29,12 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
@@ -52,8 +55,26 @@ public class ParquetUtils {
*
* @param filePath The parquet file path.
* @param configuration configuration to build fs object
* @return Set Set of row keys
*/
public static Set<String> readRowKeysFromParquet(Configuration configuration, Path filePath) {
return filterParquetRowKeys(configuration, filePath, new HashSet<>());
}
/**
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty,
* then this will return all the rowkeys.
*
* @param filePath The parquet file path.
* @param configuration configuration to build fs object
* @param filter record keys filter
* @return Set Set of row keys matching candidateRecordKeys
*/
public static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
Optional<RecordKeysFilterFunction> filterFunction = Optional.empty();
if (CollectionUtils.isNotEmpty(filter)) {
filterFunction = Optional.of(new RecordKeysFilterFunction(filter));
}
Configuration conf = new Configuration(configuration);
conf.addResource(getFs(filePath.toString(), conf).getConf());
Schema readSchema = HoodieAvroUtils.getRecordKeySchema();
@@ -66,7 +87,10 @@ public class ParquetUtils {
Object obj = reader.read();
while (obj != null) {
if (obj instanceof GenericRecord) {
rowKeys.add(((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString());
String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
if (!filterFunction.isPresent() || filterFunction.get().apply(recordKey)) {
rowKeys.add(recordKey);
}
}
obj = reader.read();
}
@@ -178,4 +202,20 @@ public class ParquetUtils {
}
return records;
}
static class RecordKeysFilterFunction implements Function<String, Boolean> {
private final Set<String> candidateKeys;
RecordKeysFilterFunction(Set<String> candidateKeys) {
this.candidateKeys = candidateKeys;
}
@Override
public Boolean apply(String recordKey) {
if (candidateKeys.contains(recordKey)) {
return true;
}
return false;
}
}
}

View File

@@ -26,7 +26,9 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -54,28 +56,13 @@ public class TestParquetUtils {
@Test
public void testHoodieWriteSupport() throws Exception {
List<String> rowKeys = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
rowKeys.add(UUID.randomUUID().toString());
}
// Write out a parquet file
Schema schema = HoodieAvroUtils.getRecordKeySchema();
BloomFilter filter = new BloomFilter(1000, 0.0001);
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema,
filter);
String filePath = basePath + "/test.parquet";
ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP,
120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE);
for (String rowKey : rowKeys) {
GenericRecord rec = new GenericData.Record(schema);
rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey);
writer.write(rec);
filter.add(rowKey);
}
writer.close();
writeParquetFile(filePath, rowKeys);
// Read and verify
List<String> rowKeysInFile = new ArrayList<>(
@@ -90,4 +77,49 @@ public class TestParquetUtils {
assertTrue("key should be found in bloom filter", filterInFile.mightContain(rowKey));
}
}
@Test
public void testFilterParquetRowKeys() throws Exception {
List<String> rowKeys = new ArrayList<>();
Set<String> filter = new HashSet<>();
for (int i = 0; i < 1000; i++) {
String rowKey = UUID.randomUUID().toString();
rowKeys.add(rowKey);
if (i % 100 == 0) {
filter.add(rowKey);
}
}
String filePath = basePath + "/test.parquet";
writeParquetFile(filePath, rowKeys);
// Read and verify
Set<String> filtered = ParquetUtils.filterParquetRowKeys(HoodieTestUtils.getDefaultHadoopConf(),
new Path(filePath),
filter);
assertEquals("Filtered count does not match", filter.size(), filtered.size());
for (String rowKey : filtered) {
assertTrue("filtered key must be in the given filter", filter.contains(rowKey));
}
}
private void writeParquetFile(String filePath,
List<String> rowKeys) throws Exception {
// Write out a parquet file
Schema schema = HoodieAvroUtils.getRecordKeySchema();
BloomFilter filter = new BloomFilter(1000, 0.0001);
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema,
filter);
ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP,
120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE);
for (String rowKey : rowKeys) {
GenericRecord rec = new GenericData.Record(schema);
rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey);
writer.write(rec);
filter.add(rowKey);
}
writer.close();
}
}