1
0

[HUDI-3760] Adding capability to fetch Metadata Records by prefix (#5208)

- Adding capability to fetch Metadata Records by key prefix so that Data Skipping could fetch only Column Stats 
- Index records pertaining to the columns being queried by, instead of reading out whole Index.
- Fixed usages of HFileScanner in HFileReader. few code paths uses cached scanner if available. Other code paths uses its own HFileScanner w/ positional read. 

Brief change log
- Rebasing ColumnStatsIndexSupport to rely on HoodieBackedTableMetadata in lieu of reading t/h Spark DS
- Adding methods enabling key-prefix lookups to HoodiFileReader, HoodieHFileReader
- Wiring key-prefix lookup t/h LogRecordScanner impls
- Cleaning up HoodieHFileReader impl

Co-authored-by: sivabalan <n.siva.b@gmail.com>
Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
This commit is contained in:
Alexey Kudinkin
2022-04-06 09:11:08 -07:00
committed by GitHub
parent 7612549bcc
commit 9e87d164b3
46 changed files with 1387 additions and 698 deletions

View File

@@ -81,7 +81,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordItr().forEachRemaining(readRecords::add);
blk.getRecordIterator().forEachRemaining(readRecords::add);
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -155,7 +155,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(readRecords::add);
}
}

View File

@@ -124,7 +124,7 @@ public class ExportCommand implements CommandMarker {
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
while (recordItr.hasNext()) {
IndexedRecord ir = recordItr.next();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the

View File

@@ -122,7 +122,7 @@ public class HoodieLogFileCommand implements CommandMarker {
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
@@ -236,7 +236,7 @@ public class HoodieLogFileCommand implements CommandMarker {
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
allRecords.add(record);

View File

@@ -339,7 +339,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordItr().forEachRemaining(records::add);
blk.getRecordIterator().forEachRemaining(records::add);
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}

View File

@@ -107,7 +107,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
.withFileContext(context)
.create();
writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
writer.appendFileInfo(HoodieHFileReader.SCHEMA_KEY.getBytes(), schema.toString().getBytes());
}
@Override

View File

@@ -18,17 +18,6 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -39,7 +28,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -51,21 +50,25 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;
import static org.apache.hudi.io.storage.HoodieHFileReader.SCHEMA_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -124,7 +127,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
FileSystem fs = getFilePath().getFileSystem(conf);
HFile.Reader hfileReader = HoodieHFileUtils.createHFileReader(fs, getFilePath(), new CacheConfig(conf), conf);
assertEquals(getSchemaFromResource(TestHoodieHFileReaderWriter.class, schemaPath),
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(KEY_SCHEMA.getBytes()))));
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(SCHEMA_KEY.getBytes()))));
}
private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
@@ -142,7 +145,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc");
HoodieFileWriter<GenericRecord> writer = createWriter(avroSchema, populateMetaFields);
List<String> keys = new ArrayList<>();
Map<String, GenericRecord> recordMap = new HashMap<>();
Map<String, GenericRecord> recordMap = new TreeMap<>();
for (int i = 0; i < 100; i++) {
GenericRecord record = new GenericData.Record(avroSchema);
String key = String.format("%s%04d", "key", i);
@@ -163,24 +166,30 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
Configuration conf = new Configuration();
HoodieHFileReader hoodieHFileReader = (HoodieHFileReader) createReader(conf);
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
assertEquals(new ArrayList<>(recordMap.values()), records);
hoodieHFileReader.close();
for (int i = 0; i < 2; i++) {
int randomRowstoFetch = 5 + RANDOM.nextInt(10);
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
List<String> rowsList = new ArrayList<>(rowsToFetch);
Collections.sort(rowsList);
hoodieHFileReader = (HoodieHFileReader) createReader(conf);
List<Pair<String, GenericRecord>> result = hoodieHFileReader.readRecords(rowsList);
assertEquals(result.size(), randomRowstoFetch);
List<GenericRecord> expectedRecords = rowsList.stream().map(recordMap::get).collect(Collectors.toList());
hoodieHFileReader = (HoodieHFileReader<GenericRecord>) createReader(conf);
List<GenericRecord> result = HoodieHFileReader.readRecords(hoodieHFileReader, rowsList);
assertEquals(expectedRecords, result);
result.forEach(entry -> {
assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
if (populateMetaFields && testAvroWithMeta) {
assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNotNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
});
hoodieHFileReader.close();
@@ -202,7 +211,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
fs.open(getFilePath()), (int) fs.getFileStatus(getFilePath()).getLen());
// Reading byte array in HFile format, without actual file path
HoodieHFileReader<GenericRecord> hfileReader =
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
assertEquals(NUM_RECORDS, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -217,7 +226,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20))
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
Iterator<GenericRecord> iterator = hfileReader.getRecordIterator(keys, avroSchema);
Iterator<GenericRecord> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema);
List<Integer> expectedIds =
IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20))
@@ -233,6 +242,59 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
}
}
@Test
public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
writeFileWithSimpleSchema();
HoodieHFileReader<GenericRecord> hfileReader =
(HoodieHFileReader<GenericRecord>) createReader(new Configuration());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
List<String> keyPrefixes = Collections.singletonList("key");
Iterator<GenericRecord> iterator =
hfileReader.getRecordsByKeyPrefixIterator(keyPrefixes, avroSchema);
List<GenericRecord> recordsByPrefix = toStream(iterator).collect(Collectors.toList());
List<GenericRecord> allRecords = toStream(hfileReader.getRecordIterator()).collect(Collectors.toList());
assertEquals(allRecords, recordsByPrefix);
// filter for "key1" : entries from key10 to key19 should be matched
List<GenericRecord> expectedKey1s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key1")).collect(Collectors.toList());
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(expectedKey1s, recordsByPrefix);
// exact match
List<GenericRecord> expectedKey25 = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key25")).collect(Collectors.toList());
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key25"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(expectedKey25, recordsByPrefix);
// no match. key prefix is beyond entries in file.
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key99"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(Collections.emptyList(), recordsByPrefix);
// no match. but keyPrefix is in between the entries found in file.
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1234"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(Collections.emptyList(), recordsByPrefix);
}
@ParameterizedTest
@ValueSource(strings = {
"/hudi_0_9_hbase_1_2_3", "/hudi_0_10_hbase_1_2_3", "/hudi_0_11_hbase_2_4_9"})
@@ -253,7 +315,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
HoodieHFileReader<GenericRecord> hfileReader =
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -261,7 +323,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
content = readHFileFromResources(complexHFile);
verifyHFileReader(HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchemaWithUDT.avsc");
assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));

View File

@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -217,7 +218,7 @@ public abstract class TestHoodieReaderWriterBase {
private void verifyFilterRowKeys(HoodieFileReader<GenericRecord> hoodieReader) {
Set<String> candidateRowKeys = IntStream.range(40, NUM_RECORDS * 2)
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toSet());
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toCollection(TreeSet::new));
List<String> expectedKeys = IntStream.range(40, NUM_RECORDS)
.mapToObj(i -> "key" + String.format("%02d", i)).sorted().collect(Collectors.toList());
assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys)

View File

@@ -18,6 +18,8 @@
package org.apache.hudi
import org.apache.hudi.common.config.TypedProperties
object HoodieConversionUtils {
def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
@@ -26,4 +28,10 @@ object HoodieConversionUtils {
def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
if (opt.isPresent) Some(opt.get) else None
def toProperties(params: Map[String, String]): TypedProperties = {
val props = new TypedProperties()
params.foreach(kv => props.setProperty(kv._1, kv._2))
props
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -32,6 +33,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -44,6 +46,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -69,6 +72,8 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -84,6 +89,7 @@ import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
@@ -100,7 +106,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -736,12 +741,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
if (populateMetaFields) {
assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
});
}
@@ -977,12 +982,11 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord;
if (enableMetaFields) {
@@ -1068,15 +1072,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
if (enableMetaFields) {
assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
final String keyInPayload = (String) ((GenericRecord) entry)
.get(HoodieMetadataPayload.KEY_FIELD_NAME);
assertFalse(keyInPayload.isEmpty());
});
@@ -1383,6 +1387,139 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
testTableOperationsImpl(engineContext, writeConfig);
}
@Test
public void testColStatsPrefixLookup() throws IOException {
this.tableType = COPY_ON_WRITE;
initPath();
initSparkContexts("TestHoodieMetadata");
initFileSystem();
fs.mkdirs(new Path(basePath));
initTimelineService();
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// disable small file handling so that every insert goes to a new file group.
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withRollbackUsingMarkers(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withMetadataIndexColumnStats(true)
.enableFullScan(false)
.build())
.build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
String firstCommit = "0000001";
List<HoodieRecord> records = dataGen.generateInserts(firstCommit, 20);
AtomicInteger counter = new AtomicInteger();
List<HoodieRecord> processedRecords = records.stream().map(entry ->
new HoodieAvroRecord(new HoodieKey("key1_" + counter.getAndIncrement(), entry.getPartitionPath()), (HoodieRecordPayload) entry.getData()))
.collect(Collectors.toList());
client.startCommitWithTime(firstCommit);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(processedRecords, 1), firstCommit).collect();
assertNoWriteErrors(writeStatuses);
// Write 2 (inserts)
String secondCommit = "0000002";
client.startCommitWithTime(secondCommit);
records = dataGen.generateInserts(secondCommit, 20);
AtomicInteger counter1 = new AtomicInteger();
processedRecords = records.stream().map(entry ->
new HoodieAvroRecord(new HoodieKey("key2_" + counter1.getAndIncrement(), entry.getPartitionPath()), (HoodieRecordPayload) entry.getData()))
.collect(Collectors.toList());
writeStatuses = client.insert(jsc.parallelize(processedRecords, 1), secondCommit).collect();
assertNoWriteErrors(writeStatuses);
Map<String, Map<String, List<String>>> commitToPartitionsToFiles = new HashMap<>();
// populate commit -> partition -> file info to assist in validation and prefi
metaClient.getActiveTimeline().getInstants().forEach(entry -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(entry).get(), HoodieCommitMetadata.class);
String commitTime = entry.getTimestamp();
if (!commitToPartitionsToFiles.containsKey(commitTime)) {
commitToPartitionsToFiles.put(commitTime, new HashMap<>());
}
commitMetadata.getPartitionToWriteStats().entrySet()
.stream()
.forEach(partitionWriteStat -> {
String partitionStatName = partitionWriteStat.getKey();
List<HoodieWriteStat> writeStats = partitionWriteStat.getValue();
String partition = HoodieTableMetadataUtil.getPartition(partitionStatName);
if (!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
commitToPartitionsToFiles.get(commitTime).put(partition, new ArrayList<>());
}
writeStats.forEach(writeStat -> commitToPartitionsToFiles.get(commitTime).get(partition).add(writeStat.getPath()));
});
} catch (IOException e) {
e.printStackTrace();
}
});
HoodieTableMetadata tableMetadata = metadata(client);
// prefix search for column (_hoodie_record_key)
ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()),
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
// there are 3 partitions in total and 2 commits. total entries should be 6.
assertEquals(result.size(), 6);
result.forEach(entry -> {
//LOG.warn("Prefix search entries just for record key col : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
});
// prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched
PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
// LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
String fileName = metadataColumnStats.getFileName();
if (fileName.contains(firstCommit)) {
assertTrue(commitToPartitionsToFiles.get(firstCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
} else {
assertTrue(commitToPartitionsToFiles.get(secondCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
}
});
// prefix search for column {commit time} and first partition
columnIndexID = new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
// LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
// for commit time column, min max should be the same since we disable small files, every commit will create a new file
assertEquals(metadataColumnStats.getMinValue(), metadataColumnStats.getMaxValue());
String fileName = metadataColumnStats.getFileName();
if (fileName.contains(firstCommit)) {
assertTrue(commitToPartitionsToFiles.get(firstCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
} else {
assertTrue(commitToPartitionsToFiles.get(secondCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
}
});
}
}
/**
* Test all major table operations with the given table, config and context.
*
@@ -1476,8 +1613,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"1000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"20");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -1540,7 +1677,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -1871,7 +2008,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -2364,7 +2501,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord;
final GenericRecord colStatsRecord = (GenericRecord) record.get(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS);

View File

@@ -21,9 +21,9 @@ package org.apache.hudi.client.functional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -51,8 +51,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -292,7 +290,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord;
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
@@ -361,10 +359,10 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
final String keyInPayload = (String) ((GenericRecord) entry)
.get(HoodieMetadataPayload.KEY_FIELD_NAME);
assertFalse(keyInPayload.isEmpty());
});

View File

@@ -67,7 +67,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;
import static org.apache.hudi.io.storage.HoodieHFileReader.SCHEMA_KEY;
/**
* Utility methods to aid testing inside the HoodieClient module.
@@ -247,7 +247,7 @@ public class HoodieClientTestUtils {
HFile.Reader reader =
HoodieHFileUtils.createHFileReader(fs, new Path(path), cacheConfig, fs.getConf());
if (schema == null) {
schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(KEY_SCHEMA.getBytes())));
schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(SCHEMA_KEY.getBytes())));
}
HFileScanner scanner = reader.getScanner(false, false);
if (!scanner.seekTo()) {

View File

@@ -69,7 +69,7 @@ public abstract class BaseHoodieTableFileIndex {
private final String[] partitionColumns;
private final FileSystemViewStorageConfig fileSystemStorageConfig;
private final HoodieMetadataConfig metadataConfig;
protected final HoodieMetadataConfig metadataConfig;
private final HoodieTableQueryType queryType;
private final Option<String> specifiedQueryInstant;

View File

@@ -289,8 +289,8 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getString(DIR_FILTER_REGEX);
}
public boolean enableFullScan() {
return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
public boolean allowFullScan() {
return getBooleanOrDefault(ENABLE_FULL_SCAN_LOG_FILES);
}
public boolean populateMetaFields() {

View File

@@ -57,7 +57,6 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
@@ -129,7 +128,7 @@ public abstract class AbstractHoodieLogRecordReader {
// Store the last instant log blocks (needed to implement rollback)
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
// Enables full scan of log records
protected final boolean enableFullScan;
protected final boolean forceFullScan;
private int totalScannedLogFiles;
// Progress
private float progress = 0.0f;
@@ -150,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader {
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean enableFullScan,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionName, InternalSchema internalSchema) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
@@ -167,7 +166,7 @@ public abstract class AbstractHoodieLogRecordReader {
this.bufferSize = bufferSize;
this.instantRange = instantRange;
this.withOperationField = withOperationField;
this.enableFullScan = enableFullScan;
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
this.path = basePath;
@@ -189,10 +188,14 @@ public abstract class AbstractHoodieLogRecordReader {
}
public synchronized void scan() {
scan(Option.empty());
scanInternal(Option.empty());
}
public synchronized void scan(Option<List<String>> keys) {
public synchronized void scan(List<String> keys) {
scanInternal(Option.of(new KeySpec(keys, true)));
}
protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
@@ -205,15 +208,16 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
try {
// Get the key field based on populate meta fields config
// and the table type
final String keyField = getKeyField();
// Iterate over the paths
boolean enableRecordLookups = !forceFullScan;
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan, keyField, internalSchema);
readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
@@ -250,7 +254,7 @@ public abstract class AbstractHoodieLogRecordReader {
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store the current block
currentInstantLogBlocks.push(logBlock);
@@ -260,7 +264,7 @@ public abstract class AbstractHoodieLogRecordReader {
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
@@ -335,7 +339,7 @@ public abstract class AbstractHoodieLogRecordReader {
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// Done
progress = 1.0f;
@@ -370,11 +374,11 @@ public abstract class AbstractHoodieLogRecordReader {
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
* handle it.
*/
private void processDataBlock(HoodieDataBlock dataBlock, Option<List<String>> keys) throws Exception {
try (ClosableIterator<IndexedRecord> recordItr = dataBlock.getRecordItr(keys.orElse(Collections.emptyList()))) {
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
try (ClosableIterator<IndexedRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
Option<Schema> schemaOption = getMergedSchema(dataBlock);
while (recordItr.hasNext()) {
IndexedRecord currentRecord = recordItr.next();
while (recordIterator.hasNext()) {
IndexedRecord currentRecord = recordIterator.next();
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord;
processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
@@ -449,23 +453,20 @@ public abstract class AbstractHoodieLogRecordReader {
* Process the set of log blocks belonging to the last instant which is read fully.
*/
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen,
Option<List<String>> keys) throws Exception {
Option<KeySpec> keySpecOpt) throws Exception {
while (!logBlocks.isEmpty()) {
LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
processDataBlock((HoodieAvroDataBlock) lastBlock, keySpecOpt);
break;
case HFILE_DATA_BLOCK:
if (!keys.isPresent()) {
keys = Option.of(Collections.emptyList());
}
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
processDataBlock((HoodieHFileDataBlock) lastBlock, keySpecOpt);
break;
case PARQUET_DATA_BLOCK:
processDataBlock((HoodieParquetDataBlock) lastBlock, keys);
processDataBlock((HoodieParquetDataBlock) lastBlock, keySpecOpt);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
@@ -481,6 +482,15 @@ public abstract class AbstractHoodieLogRecordReader {
progress = numLogFilesSeen - 1 / logFilePaths.size();
}
private ClosableIterator<IndexedRecord> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
if (keySpecOpt.isPresent()) {
KeySpec keySpec = keySpecOpt.get();
return dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey);
}
return dataBlock.getRecordIterator();
}
/**
* Return progress of scanning as a float between 0.0 to 1.0.
*/
@@ -504,7 +514,7 @@ public abstract class AbstractHoodieLogRecordReader {
return payloadClassFQN;
}
protected Option<String> getPartitionName() {
public Option<String> getPartitionName() {
return partitionName;
}
@@ -520,6 +530,16 @@ public abstract class AbstractHoodieLogRecordReader {
return withOperationField;
}
protected static class KeySpec {
private final List<String> keys;
private final boolean fullKey;
public KeySpec(List<String> keys, boolean fullKey) {
this.keys = keys;
this.fullKey = fullKey;
}
}
/**
* Builder used to build {@code AbstractHoodieLogRecordScanner}.
*/

View File

@@ -53,13 +53,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
boolean reverseLogReader, int bufferSize, boolean enableInlineReading,
String recordKeyField) throws IOException {
this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, enableInlineReading, recordKeyField, InternalSchema.getEmptyInternalSchema());
}
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
boolean reverseLogReader, int bufferSize, boolean enableInlineReading,
boolean reverseLogReader, int bufferSize, boolean enableRecordLookups,
String recordKeyField, InternalSchema internalSchema) throws IOException {
this.logFiles = logFiles;
this.fs = fs;
@@ -69,12 +63,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
this.recordKeyField = recordKeyField;
this.enableInlineReading = enableInlineReading;
this.enableInlineReading = enableRecordLookups;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
enableInlineReading, recordKeyField, internalSchema);
enableRecordLookups, recordKeyField, internalSchema);
}
}

View File

@@ -45,6 +45,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
* be used as a lookup table when merging the base columnar file with the redo log file.
@@ -76,14 +78,14 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange, boolean autoScan,
Option<InstantRange> instantRange,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
boolean withOperationField, boolean enableFullScan,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionName, InternalSchema internalSchema) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
instantRange, withOperationField,
enableFullScan, partitionName, internalSchema);
forceFullScan, partitionName, internalSchema);
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
@@ -93,7 +95,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
if (autoScan) {
if (forceFullScan) {
performScan();
}
}
@@ -115,10 +117,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
@Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
return records.iterator();
}
public Map<String, HoodieRecord<? extends HoodieRecordPayload>> getRecords() {
checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
return records;
}
@@ -211,8 +215,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
// incremental filtering
protected Option<InstantRange> instantRange = Option.empty();
protected String partitionName;
// auto scan default true
private boolean autoScan = true;
// operation field default false
private boolean withOperationField = false;
@@ -290,11 +292,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
return this;
}
public Builder withAutoScan(boolean autoScan) {
this.autoScan = autoScan;
return this;
}
public Builder withInternalSchema(InternalSchema internalSchema) {
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
return this;
@@ -315,7 +312,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
public HoodieMergedLogRecordScanner build() {
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange, autoScan,
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true,
Option.ofNullable(partitionName), internalSchema);
}

View File

@@ -314,7 +314,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
output.write(schemaContent);
List<IndexedRecord> records = new ArrayList<>();
try (ClosableIterator<IndexedRecord> recordItr = getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = getRecordIterator()) {
recordItr.forEachRemaining(records::add);
}

View File

@@ -138,7 +138,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
/**
* Returns all the records iterator contained w/in this block.
*/
public final ClosableIterator<IndexedRecord> getRecordItr() {
public final ClosableIterator<IndexedRecord> getRecordIterator() {
if (records.isPresent()) {
return list2Iterator(records.get());
}
@@ -162,21 +162,21 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
* @return List of IndexedRecords for the keys of interest.
* @throws IOException in case of failures encountered when reading/parsing records
*/
public final ClosableIterator<IndexedRecord> getRecordItr(List<String> keys) throws IOException {
public final ClosableIterator<IndexedRecord> getRecordIterator(List<String> keys, boolean fullKey) throws IOException {
boolean fullScan = keys.isEmpty();
if (enablePointLookups && !fullScan) {
return lookupRecords(keys);
return lookupRecords(keys, fullKey);
}
// Otherwise, we fetch all the records and filter out all the records, but the
// ones requested
ClosableIterator<IndexedRecord> allRecords = getRecordItr();
ClosableIterator<IndexedRecord> allRecords = getRecordIterator();
if (fullScan) {
return allRecords;
}
HashSet<String> keySet = new HashSet<>(keys);
return FilteringIterator.getInstance(allRecords, keySet, this::getRecordKey);
return FilteringIterator.getInstance(allRecords, keySet, fullKey, this::getRecordKey);
}
protected ClosableIterator<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
@@ -193,7 +193,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
}
}
protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
throw new UnsupportedOperationException(
String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
);
@@ -252,21 +252,25 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
private final ClosableIterator<T> nested; // nested iterator
private final Set<String> keys; // the filtering keys
private final boolean fullKey;
private final Function<T, Option<String>> keyExtract; // function to extract the key
private T next;
private FilteringIterator(ClosableIterator<T> nested, Set<String> keys, Function<T, Option<String>> keyExtract) {
private FilteringIterator(ClosableIterator<T> nested, Set<String> keys, boolean fullKey, Function<T, Option<String>> keyExtract) {
this.nested = nested;
this.keys = keys;
this.fullKey = fullKey;
this.keyExtract = keyExtract;
}
public static <T extends IndexedRecord> FilteringIterator<T> getInstance(
ClosableIterator<T> nested,
Set<String> keys,
boolean fullKey,
Function<T, Option<String>> keyExtract) {
return new FilteringIterator<>(nested, keys, keyExtract);
return new FilteringIterator<>(nested, keys, fullKey, keyExtract);
}
@Override
@@ -278,7 +282,13 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
public boolean hasNext() {
while (this.nested.hasNext()) {
this.next = this.nested.next();
if (keys.contains(keyExtract.apply(this.next).orElse(null))) {
String key = keyExtract.apply(this.next)
.orElseGet(() -> {
throw new IllegalStateException(String.format("Record without a key (%s)", this.next));
});
if (fullKey && keys.contains(key)
|| !fullKey && keys.stream().anyMatch(key::startsWith)) {
return true;
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
@@ -47,6 +48,7 @@ import org.apache.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -149,6 +151,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
});
writer.appendFileInfo(HoodieHFileReader.SCHEMA_KEY.getBytes(), getSchema().toString().getBytes());
writer.close();
ostream.flush();
ostream.close();
@@ -163,11 +167,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
FileSystem fs = FSUtils.getFs(pathForReader.toString(), new Configuration());
// Read the content
HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(
FSUtils.getFs(pathForReader.toString(), new Configuration()), pathForReader, content);
// Sets up the writer schema
reader.withSchema(writerSchema);
HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(fs, pathForReader, content, Option.of(writerSchema));
Iterator<IndexedRecord> recordIterator = reader.getRecordIterator(readerSchema);
return new ClosableIterator<IndexedRecord>() {
@Override
@@ -189,7 +191,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// TODO abstract this w/in HoodieDataBlock
@Override
protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
// NOTE: It's important to extend Hadoop configuration here to make sure configuration
@@ -204,13 +206,18 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize());
// HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
Collections.sort(keys);
// HFile read will be efficient if keys are sorted, since on storage records are sorted by key.
// This will avoid unnecessary seeks.
List<String> sortedKeys = new ArrayList<>(keys);
Collections.sort(sortedKeys);
final HoodieHFileReader<IndexedRecord> reader =
new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf));
// Get writer's schema from the header
final ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator(keys, readerSchema);
final ClosableIterator<IndexedRecord> recordIterator =
fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
return new ClosableIterator<IndexedRecord>() {
@Override
public boolean hasNext() {

View File

@@ -257,7 +257,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordItr()) {
try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordIterator()) {
StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
// Filter blocks in desired time window
.filter(r -> commitsFilter.apply((GenericRecord) r))

View File

@@ -32,9 +32,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class CollectionUtils {
@@ -48,6 +51,16 @@ public class CollectionUtils {
return !isNullOrEmpty(c);
}
/**
* Collects provided {@link Iterator} to a {@link Stream}
*/
public static <T> Stream<T> toStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
false
);
}
/**
* Combines provided arrays into one
*/

View File

@@ -18,32 +18,28 @@
package org.apache.hudi.io.storage;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Set;
public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable {
public String[] readMinMaxRecordKeys();
String[] readMinMaxRecordKeys();
public BloomFilter readBloomFilter();
BloomFilter readBloomFilter();
public Set<String> filterRowKeys(Set<String> candidateRowKeys);
Set<String> filterRowKeys(Set<String> candidateRowKeys);
default Map<String, R> getRecordsByKeys(List<String> rowKeys) throws IOException {
throw new UnsupportedOperationException();
}
ClosableIterator<R> getRecordIterator(Schema readerSchema) throws IOException;
public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
default Iterator<R> getRecordIterator() throws IOException {
default ClosableIterator<R> getRecordIterator() throws IOException {
return getRecordIterator(getSchema());
}
@@ -55,6 +51,22 @@ public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable
return getRecordByKey(key, getSchema());
}
default ClosableIterator<R> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
throw new UnsupportedOperationException();
}
default ClosableIterator<R> getRecordsByKeysIterator(List<String> keys) throws IOException {
return getRecordsByKeysIterator(keys, getSchema());
}
default ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
throw new UnsupportedEncodingException();
}
default ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes) throws IOException {
return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
}
Schema getSchema();
void close();

View File

@@ -18,21 +18,10 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
@@ -44,97 +33,117 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.LazyRef;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
* <p>
* {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
*/
public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileReader<R> {
public static final String KEY_FIELD_NAME = "key";
public static final String KEY_SCHEMA = "schema";
// TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling
public static final String SCHEMA_KEY = "schema";
public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode";
public static final String KEY_FIELD_NAME = "key";
public static final String KEY_MIN_RECORD = "minRecordKey";
public static final String KEY_MAX_RECORD = "maxRecordKey";
private static final Logger LOG = LogManager.getLogger(HoodieHFileReader.class);
private Path path;
private Configuration conf;
private HFile.Reader reader;
private FSDataInputStream fsDataInputStream;
private Schema schema;
// Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each
// key retrieval.
private HFileScanner keyScanner;
private final Path path;
public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException {
this.conf = configuration;
this.path = path;
this.reader = HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
private final LazyRef<Schema> schema;
// NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in Positional Read ("pread")
// mode (ie created w/ "pread = true")
private final HFile.Reader reader;
// NOTE: Scanner caches read blocks, therefore it's important to re-use scanner
// wherever possible
private final HFileScanner sharedScanner;
private final Object sharedScannerLock = new Object();
public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig) throws IOException {
this(path,
HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), hadoopConf), path, cacheConfig, hadoopConf),
Option.empty());
}
public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
this.conf = configuration;
this.path = path;
this.fsDataInputStream = fs.open(path);
this.reader = HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, configuration);
public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, hadoopConf), Option.empty());
}
public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content) throws IOException {
this.reader = HoodieHFileUtils.createHFileReader(fs, dummyPath, content);
public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content, Option<Schema> schemaOpt) throws IOException {
this(null, HoodieHFileUtils.createHFileReader(fs, dummyPath, content), schemaOpt);
}
public HoodieHFileReader(Path path, HFile.Reader reader, Option<Schema> schemaOpt) throws IOException {
this.path = path;
this.reader = reader;
// For shared scanner, which is primarily used for point-lookups, we're caching blocks
// by default, to minimize amount of traffic to the underlying storage
this.sharedScanner = getHFileScanner(reader, true);
this.schema = schemaOpt.map(LazyRef::eager)
.orElseGet(() -> LazyRef.lazy(() -> fetchSchema(reader)));
}
@Override
public String[] readMinMaxRecordKeys() {
// NOTE: This access to reader is thread-safe
HFileInfo fileInfo = reader.getHFileInfo();
return new String[] {new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
return new String[]{new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
}
@Override
public Schema getSchema() {
if (schema == null) {
HFileInfo fileInfo = reader.getHFileInfo();
schema = new Schema.Parser().parse(new String(fileInfo.get(KEY_SCHEMA.getBytes())));
}
return schema;
}
/**
* Sets up the writer schema explicitly.
*/
public void withSchema(Schema schema) {
this.schema = schema;
}
@Override
public BloomFilter readBloomFilter() {
HFileInfo fileInfo;
try {
fileInfo = reader.getHFileInfo();
ByteBuff serializedFilter = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader();
byte[] filterBytes = new byte[serializedFilter.remaining()];
serializedFilter.get(filterBytes); // read the bytes that were written
return BloomFilterFactory.fromString(new String(filterBytes),
// NOTE: This access to reader is thread-safe
HFileInfo fileInfo = reader.getHFileInfo();
ByteBuff buf = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader();
// We have to copy bytes here, since we can't reuse buffer's underlying
// array as is, since it contains additional metadata (header)
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
return BloomFilterFactory.fromString(new String(bytes),
new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
} catch (IOException e) {
throw new HoodieException("Could not read bloom filter from " + path, e);
}
}
@Override
public Schema getSchema() {
return schema.get();
}
/**
* Filter keys by availability.
* <p>
@@ -145,292 +154,423 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
*/
@Override
public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
return candidateRowKeys.stream().filter(k -> {
try {
return isKeyAvailable(k);
} catch (IOException e) {
LOG.error("Failed to check key availability: " + k);
return false;
}
}).collect(Collectors.toSet());
}
checkState(candidateRowKeys instanceof TreeSet,
String.format("HFile reader expects a TreeSet as iterating over ordered keys is more performant, got (%s)", candidateRowKeys.getClass().getSimpleName()));
@Override
public Map<String, R> getRecordsByKeys(List<String> rowKeys) throws IOException {
return filterRecordsImpl(new TreeSet<>(rowKeys));
}
/**
* Filter records by sorted keys.
* <p>
* TODO: Implement single seek and sequential scan till the last candidate key
* instead of repeated seeks.
*
* @param sortedCandidateRowKeys - Sorted set of keys to fetch records for
* @return Map of keys to fetched records
* @throws IOException When the deserialization of records fail
*/
private synchronized Map<String, R> filterRecordsImpl(TreeSet<String> sortedCandidateRowKeys) throws IOException {
HashMap<String, R> filteredRecords = new HashMap<>();
for (String key : sortedCandidateRowKeys) {
Option<R> record = getRecordByKey(key);
if (record.isPresent()) {
filteredRecords.put(key, record.get());
}
}
return filteredRecords;
}
/**
* Reads all the records with given schema.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
private List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema readerSchema) {
final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
List<Pair<String, R>> recordList = new LinkedList<>();
try {
final HFileScanner scanner = reader.getScanner(false, false);
if (scanner.seekTo()) {
do {
Cell c = scanner.getCell();
final Pair<String, R> keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema);
recordList.add(keyAndRecordPair);
} while (scanner.next());
}
return recordList;
} catch (IOException e) {
throw new HoodieException("Error reading hfile " + path + " as a dataframe", e);
}
}
/**
* Reads all the records with current schema.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
public List<Pair<String, R>> readAllRecords() {
Schema schema = getSchema();
return readAllRecords(schema, schema);
}
/**
* Reads all the records with current schema and filtering keys.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
public List<Pair<String, R>> readRecords(List<String> keys) throws IOException {
return readRecords(keys, getSchema());
}
/**
* Reads all the records with given schema and filtering keys.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) throws IOException {
this.schema = schema;
List<Pair<String, R>> records = new ArrayList<>();
for (String key: keys) {
Option<R> value = getRecordByKey(key, schema);
if (value.isPresent()) {
records.add(new Pair(key, value.get()));
}
}
return records;
}
public ClosableIterator<R> getRecordIterator(List<String> keys, Schema schema) throws IOException {
this.schema = schema;
Iterator<String> iterator = keys.iterator();
return new ClosableIterator<R>() {
private R next;
@Override
public void close() {
}
@Override
public boolean hasNext() {
synchronized (sharedScannerLock) {
return candidateRowKeys.stream().filter(k -> {
try {
while (iterator.hasNext()) {
Option<R> value = getRecordByKey(iterator.next(), schema);
if (value.isPresent()) {
next = value.get();
return true;
}
}
return false;
return isKeyAvailable(k, sharedScanner);
} catch (IOException e) {
throw new HoodieIOException("unable to read next record from hfile ", e);
LOG.error("Failed to check key availability: " + k);
return false;
}
}
@Override
public R next() {
return next;
}
};
}).collect(Collectors.toSet());
}
}
@SuppressWarnings("unchecked")
@Override
public Iterator getRecordIterator(Schema readerSchema) throws IOException {
final HFileScanner scanner = reader.getScanner(false, false);
final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
ValidationUtils.checkState(keyFieldSchema != null,
"Missing key field '" + KEY_FIELD_NAME + "' in the schema!");
return new Iterator<R>() {
private R next = null;
private boolean eof = false;
@Override
public boolean hasNext() {
try {
// To handle when hasNext() is called multiple times for idempotency and/or the first time
if (this.next == null && !this.eof) {
if (!scanner.isSeeked() && scanner.seekTo()) {
final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
this.next = keyAndRecordPair.getSecond();
}
}
return this.next != null;
} catch (IOException io) {
throw new HoodieIOException("unable to read next record from hfile ", io);
}
}
@Override
public R next() {
try {
// To handle case when next() is called before hasNext()
if (this.next == null) {
if (!hasNext()) {
throw new HoodieIOException("No more records left to read from hfile");
}
}
R retVal = this.next;
if (scanner.next()) {
final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
this.next = keyAndRecordPair.getSecond();
} else {
this.next = null;
this.eof = true;
}
return retVal;
} catch (IOException io) {
throw new HoodieIOException("unable to read next record from parquet file ", io);
}
}
};
}
private boolean isKeyAvailable(String key) throws IOException {
final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
synchronized (this) {
if (keyScanner == null) {
keyScanner = reader.getScanner(false, false);
}
if (keyScanner.seekTo(kv) == 0) {
return true;
}
public Option<R> getRecordByKey(String key, Schema readerSchema) throws IOException {
synchronized (sharedScannerLock) {
return (Option<R>) fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema);
}
return false;
}
@SuppressWarnings("unchecked")
@Override
public Option getRecordByKey(String key, Schema readerSchema) throws IOException {
byte[] value = null;
final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
ValidationUtils.checkState(keyFieldSchema != null);
KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
synchronized (this) {
if (keyScanner == null) {
keyScanner = reader.getScanner(false, false);
}
if (keyScanner.seekTo(kv) == 0) {
Cell c = keyScanner.getCell();
// Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
}
}
if (value != null) {
R record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keyFieldSchema);
return Option.of(record);
}
return Option.empty();
public ClosableIterator<R> getRecordIterator(Schema readerSchema) throws IOException {
// TODO eval whether seeking scanner would be faster than pread
HFileScanner scanner = getHFileScanner(reader, false);
return (ClosableIterator<R>) new RecordIterator(scanner, getSchema(), readerSchema);
}
private Pair<String, R> getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Option<Schema.Field> keyFieldSchema) throws IOException {
final byte[] keyBytes = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
final byte[] valueBytes = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
R record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keyFieldSchema);
return new Pair<>(new String(keyBytes), record);
@SuppressWarnings("unchecked")
@Override
public ClosableIterator<R> getRecordsByKeysIterator(List<String> keys, Schema readerSchema) throws IOException {
// We're caching blocks for this scanner to minimize amount of traffic
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
HFileScanner scanner = getHFileScanner(reader, true);
return (ClosableIterator<R>) new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema);
}
/**
* Deserialize the record byte array contents to record object.
*
* @param keyBytes - Record key as byte array
* @param valueBytes - Record content as byte array
* @param writerSchema - Writer schema
* @param readerSchema - Reader schema
* @param keyFieldSchema - Key field id in the schema
* @return Deserialized record object
*/
private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema,
Option<Schema.Field> keyFieldSchema) throws IOException {
R record = (R) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema);
materializeRecordIfNeeded(keyBytes, record, keyFieldSchema);
return record;
}
/**
* Materialize the record for any missing fields, if needed.
*
* @param keyBytes - Key byte array
* @param record - Record object to materialize
* @param keyFieldSchema - Key field id in the schema
*/
private void materializeRecordIfNeeded(final byte[] keyBytes, R record, Option<Schema.Field> keyFieldSchema) {
if (keyFieldSchema.isPresent()) {
final Object keyObject = record.get(keyFieldSchema.get().pos());
if (keyObject != null && keyObject.toString().isEmpty()) {
record.put(keyFieldSchema.get().pos(), new String(keyBytes));
}
}
@SuppressWarnings("unchecked")
@Override
public ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
// We're caching blocks for this scanner to minimize amount of traffic
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
HFileScanner scanner = getHFileScanner(reader, true);
return (ClosableIterator<R>) new RecordByKeyPrefixIterator(scanner, keyPrefixes, getSchema(), readerSchema);
}
@Override
public long getTotalRecords() {
// NOTE: This access to reader is thread-safe
return reader.getEntries();
}
@Override
public synchronized void close() {
public void close() {
try {
reader.close();
reader = null;
if (fsDataInputStream != null) {
fsDataInputStream.close();
synchronized (this) {
reader.close();
}
keyScanner = null;
} catch (IOException e) {
throw new HoodieIOException("Error closing the hfile reader", e);
}
}
private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws IOException {
final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
return keyScanner.seekTo(kv) == 0;
}
private static Iterator<GenericRecord> getRecordByKeyPrefixIteratorInternal(HFileScanner scanner,
String keyPrefix,
Schema writerSchema,
Schema readerSchema) throws IOException {
KeyValue kv = new KeyValue(keyPrefix.getBytes(), null, null, null);
// NOTE: HFile persists both keys/values as bytes, therefore lexicographical sorted is
// essentially employed
//
// For the HFile containing list of cells c[0], c[1], ..., c[N], `seekTo(cell)` would return
// following:
// a) -1, if cell < c[0], no position;
// b) 0, such that c[i] = cell and scanner is left in position i;
// c) and 1, such that c[i] < cell, and scanner is left in position i.
//
// Consider entries w/ the following keys in HFile: [key01, key02, key03, key04,..., key20];
// In case looked up key-prefix is
// - "key", `seekTo()` will return -1 and place the cursor just before "key01",
// `getCell()` will return "key01" entry
// - "key03", `seekTo()` will return 0 (exact match) and place the cursor just before "key03",
// `getCell()` will return "key03" entry
// - "key1", `seekTo()` will return 1 (first not lower than) and place the cursor just before
// "key10" (i.e. on "key09");
//
int val = scanner.seekTo(kv);
if (val == 1) {
// Try moving to next entry, matching the prefix key; if we're at the EOF,
// `next()` will return false
if (!scanner.next()) {
return Collections.emptyIterator();
}
}
class KeyPrefixIterator implements Iterator<GenericRecord> {
private GenericRecord next = null;
private boolean eof = false;
@Override
public boolean hasNext() {
if (next != null) {
return true;
} else if (eof) {
return false;
}
Cell c = Objects.requireNonNull(scanner.getCell());
byte[] keyBytes = copyKeyFromCell(c);
String key = new String(keyBytes);
// Check whether we're still reading records corresponding to the key-prefix
if (!key.startsWith(keyPrefix)) {
return false;
}
// Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
byte[] valueBytes = copyValueFromCell(c);
try {
next = deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
// In case scanner is not able to advance, it means we reached EOF
eof = !scanner.next();
} catch (IOException e) {
throw new HoodieIOException("Failed to deserialize payload", e);
}
return true;
}
@Override
public GenericRecord next() {
GenericRecord next = this.next;
this.next = null;
return next;
}
}
return new KeyPrefixIterator();
}
private static Option<GenericRecord> fetchRecordByKeyInternal(HFileScanner scanner, String key, Schema writerSchema, Schema readerSchema) throws IOException {
KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
if (scanner.seekTo(kv) != 0) {
return Option.empty();
}
Cell c = scanner.getCell();
byte[] valueBytes = copyValueFromCell(c);
GenericRecord record = deserialize(key.getBytes(), valueBytes, writerSchema, readerSchema);
return Option.of(record);
}
private static GenericRecord getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema) throws IOException {
final byte[] keyBytes = copyKeyFromCell(cell);
final byte[] valueBytes = copyValueFromCell(cell);
return deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
}
private static GenericRecord deserializeUnchecked(final byte[] keyBytes,
final byte[] valueBytes,
Schema writerSchema,
Schema readerSchema) {
try {
return deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
} catch (IOException e) {
throw new HoodieIOException("Failed to deserialize payload", e);
}
}
private static GenericRecord deserialize(final byte[] keyBytes,
final byte[] valueBytes,
Schema writerSchema,
Schema readerSchema) throws IOException {
GenericRecord record = HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema);
getKeySchema(readerSchema).ifPresent(keyFieldSchema -> {
final Object keyObject = record.get(keyFieldSchema.pos());
if (keyObject != null && keyObject.toString().isEmpty()) {
record.put(keyFieldSchema.pos(), new String(keyBytes));
}
});
return record;
}
private static Schema fetchSchema(HFile.Reader reader) {
HFileInfo fileInfo = reader.getHFileInfo();
return new Schema.Parser().parse(new String(fileInfo.get(SCHEMA_KEY.getBytes())));
}
private static byte[] copyKeyFromCell(Cell cell) {
return Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
}
private static byte[] copyValueFromCell(Cell c) {
return Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
}
/**
* NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
* <p>
* Reads all the records with given schema
*/
public static <R extends IndexedRecord> List<R> readAllRecords(HoodieHFileReader<R> reader) throws IOException {
Schema schema = reader.getSchema();
return toStream(reader.getRecordIterator(schema))
.collect(Collectors.toList());
}
/**
* NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
* <p>
* Reads all the records with given schema and filtering keys.
*/
public static <R extends IndexedRecord> List<R> readRecords(HoodieHFileReader<R> reader,
List<String> keys) throws IOException {
return readRecords(reader, keys, reader.getSchema());
}
/**
* NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
* <p>
* Reads all the records with given schema and filtering keys.
*/
public static <R extends IndexedRecord> List<R> readRecords(HoodieHFileReader<R> reader,
List<String> keys,
Schema schema) throws IOException {
Collections.sort(keys);
return toStream(reader.getRecordsByKeysIterator(keys, schema))
.collect(Collectors.toList());
}
private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks) {
// NOTE: Only scanners created in Positional Read ("pread") mode could share the same reader,
// since scanners in default mode will be seeking w/in the underlying stream
return reader.getScanner(cacheBlocks, true);
}
private static Option<Schema.Field> getKeySchema(Schema schema) {
return Option.ofNullable(schema.getField(KEY_FIELD_NAME));
}
private static class RecordByKeyPrefixIterator implements ClosableIterator<GenericRecord> {
private final Iterator<String> keyPrefixesIterator;
private Iterator<GenericRecord> recordsIterator;
private final HFileScanner scanner;
private final Schema writerSchema;
private final Schema readerSchema;
private GenericRecord next = null;
RecordByKeyPrefixIterator(HFileScanner scanner, List<String> keyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException {
this.keyPrefixesIterator = keyPrefixes.iterator();
this.scanner = scanner;
this.scanner.seekTo(); // position at the beginning of the file
this.writerSchema = writerSchema;
this.readerSchema = readerSchema;
}
@Override
public boolean hasNext() {
try {
while (true) {
// NOTE: This is required for idempotency
if (next != null) {
return true;
} else if (recordsIterator != null && recordsIterator.hasNext()) {
next = recordsIterator.next();
return true;
} else if (keyPrefixesIterator.hasNext()) {
String currentKeyPrefix = keyPrefixesIterator.next();
recordsIterator =
getRecordByKeyPrefixIteratorInternal(scanner, currentKeyPrefix, writerSchema, readerSchema);
} else {
return false;
}
}
} catch (IOException e) {
throw new HoodieIOException("Unable to read next record from HFile", e);
}
}
@Override
public GenericRecord next() {
GenericRecord next = this.next;
this.next = null;
return next;
}
@Override
public void close() {
scanner.close();
}
}
private static class RecordByKeyIterator implements ClosableIterator<GenericRecord> {
private final Iterator<String> keyIterator;
private final HFileScanner scanner;
private final Schema readerSchema;
private final Schema writerSchema;
private GenericRecord next = null;
RecordByKeyIterator(HFileScanner scanner, List<String> keys, Schema writerSchema, Schema readerSchema) throws IOException {
this.keyIterator = keys.iterator();
this.scanner = scanner;
this.scanner.seekTo(); // position at the beginning of the file
this.writerSchema = writerSchema;
this.readerSchema = readerSchema;
}
@Override
public boolean hasNext() {
try {
// NOTE: This is required for idempotency
if (next != null) {
return true;
}
while (keyIterator.hasNext()) {
Option<GenericRecord> value = fetchRecordByKeyInternal(scanner, keyIterator.next(), writerSchema, readerSchema);
if (value.isPresent()) {
next = value.get();
return true;
}
}
return false;
} catch (IOException e) {
throw new HoodieIOException("unable to read next record from hfile ", e);
}
}
@Override
public GenericRecord next() {
GenericRecord next = this.next;
this.next = null;
return next;
}
@Override
public void close() {
scanner.close();
}
}
private static class RecordIterator implements ClosableIterator<GenericRecord> {
private final HFileScanner scanner;
private final Schema writerSchema;
private final Schema readerSchema;
private GenericRecord next = null;
RecordIterator(HFileScanner scanner, Schema writerSchema, Schema readerSchema) {
this.scanner = scanner;
this.writerSchema = writerSchema;
this.readerSchema = readerSchema;
}
@Override
public boolean hasNext() {
try {
// NOTE: This is required for idempotency
if (next != null) {
return true;
}
boolean hasRecords;
if (!scanner.isSeeked()) {
hasRecords = scanner.seekTo();
} else {
hasRecords = scanner.next();
}
if (!hasRecords) {
return false;
}
this.next = getRecordFromCell(scanner.getCell(), writerSchema, readerSchema);
return true;
} catch (IOException io) {
throw new HoodieIOException("unable to read next record from hfile ", io);
}
}
@Override
public GenericRecord next() {
GenericRecord next = this.next;
this.next = null;
return next;
}
@Override
public void close() {
scanner.close();
}
}
static class SeekableByteArrayInputStream extends ByteBufferBackedInputStream implements Seekable, PositionedReadable {
public SeekableByteArrayInputStream(byte[] buf) {
super(buf);

View File

@@ -18,9 +18,6 @@
package org.apache.hudi.io.storage;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -29,6 +26,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.OrcReaderIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.orc.OrcFile;
@@ -37,6 +35,9 @@ import org.apache.orc.Reader.Options;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.util.Set;
public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReader {
private Path path;
private Configuration conf;
@@ -64,12 +65,12 @@ public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReade
}
@Override
public Iterator<R> getRecordIterator(Schema schema) throws IOException {
public ClosableIterator<R> getRecordIterator(Schema schema) throws IOException {
try {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema);
RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
return new OrcReaderIterator(recordReader, schema, orcSchema);
return new OrcReaderIterator<>(recordReader, schema, orcSchema);
} catch (IOException io) {
throw new HoodieIOException("Unable to create an ORC reader.", io);
}

View File

@@ -18,12 +18,6 @@
package org.apache.hudi.io.storage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -31,12 +25,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader<R> {
private final Path path;
@@ -66,10 +65,10 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
}
@Override
public Iterator<R> getRecordIterator(Schema schema) throws IOException {
public ClosableIterator<R> getRecordIterator(Schema schema) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, schema);
ParquetReader<R> reader = AvroParquetReader.<R>builder(path).withConf(conf).build();
ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader);
ParquetReaderIterator<R> parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
}
@@ -81,7 +80,7 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
@Override
public void close() {
readerIterators.forEach(entry -> entry.close());
readerIterators.forEach(ParquetReaderIterator::close);
}
@Override

View File

@@ -378,7 +378,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName);
protected abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
public abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
protected HoodieEngineContext getEngineContext() {
return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());

View File

@@ -21,9 +21,11 @@ package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -159,4 +161,9 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
}
@Override
public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName) {
throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
}
}

View File

@@ -18,6 +18,9 @@
package org.apache.hudi.metadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -25,23 +28,24 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -49,10 +53,6 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -60,15 +60,22 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
/**
* Table metadata provided by an internal DFS backed Hudi metadata table.
@@ -77,6 +84,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
private static final Schema METADATA_RECORD_SCHEMA = HoodieMetadataRecord.getClassSchema();
private String metadataBasePath;
// Metadata table's timeline and metaclient
private HoodieTableMetaClient metadataMetaClient;
@@ -133,28 +142,79 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
@Override
protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
String partitionName) {
public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
String partitionName) {
// NOTE: Since we partition records to a particular file-group by full key, we will have
// to scan all file-groups for all key-prefixes as each of these might contain some
// records matching the key-prefix
List<FileSlice> partitionFileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
return engineContext.parallelize(partitionFileSlices)
.flatMap(
(SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
// NOTE: Since this will be executed by executors, we can't access previously cached
// readers, and therefore have to always open new ones
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReaders(partitionName, fileSlice);
try {
List<Long> timings = new ArrayList<>();
HoodieFileReader baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
// TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
return Collections.emptyIterator();
}
boolean fullKeys = false;
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner, keyPrefixes, fullKeys, timings);
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
readFromBaseAndMergeWithLogRecords(baseFileReader, keyPrefixes, fullKeys, logRecords, timings, partitionName);
LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
keyPrefixes.size(), timings));
return mergedRecords.iterator();
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + keyPrefixes.size() + " key : ", ioe);
} finally {
closeReader(readers);
}
}
)
.map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
.filter(Objects::nonNull);
}
@Override
public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
String partitionName) {
Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = getPartitionFileSliceToKeysMapping(partitionName, keys);
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
AtomicInteger fileSlicesKeysCount = new AtomicInteger();
partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(partitionName,
partitionFileSlicePair.getRight());
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
try {
List<Long> timings = new ArrayList<>();
HoodieFileReader baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
return;
}
// local map to assist in merging with base file records
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner,
fileSliceKeys, timings);
result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, logRecords,
boolean fullKeys = true;
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, fullKeys, logRecords,
timings, partitionName));
LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
fileSliceKeys.size(), timings));
fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
@@ -171,81 +231,127 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
List<String> keys, List<Long> timings) {
List<String> keys,
boolean fullKey,
List<Long> timings) {
HoodieTimer timer = new HoodieTimer().startTimer();
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
// Retrieve records from log file
timer.startTimer();
if (logRecordScanner != null) {
if (metadataConfig.enableFullScan()) {
// path which does full scan of log files
for (String key : keys) {
logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
}
} else {
// this path will do seeks pertaining to the keys passed in
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList = logRecordScanner.getRecordsByKeys(keys);
for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
logRecords.put(entry.getKey(), entry.getValue());
}
if (logRecordScanner == null) {
timings.add(timer.endTimer());
return Collections.emptyMap();
}
String partitionName = logRecordScanner.getPartitionName().get();
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
if (isFullScanAllowedForPartition(partitionName)) {
checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
// Path which does full scan of log files
for (String key : keys) {
logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
}
} else {
for (String key : keys) {
logRecords.put(key, Option.empty());
// This path will do seeks pertaining to the keys passed in
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
fullKey ? logRecordScanner.getRecordsByKeys(keys)
: logRecordScanner.getRecordsByKeyPrefixes(keys)
.stream()
.map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
.collect(Collectors.toList());
for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
logRecords.put(entry.getKey(), entry.getValue());
}
}
timings.add(timer.endTimer());
return logRecords;
}
private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
List<String> keys, Map<String,
Option<HoodieRecord<HoodieMetadataPayload>>> logRecords, List<Long> timings, String partitionName) throws IOException {
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
// merge with base records
List<String> keys,
boolean fullKeys,
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
List<Long> timings,
String partitionName) throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
timer.startTimer();
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
// Retrieve record from base file
if (baseFileReader != null) {
HoodieTimer readTimer = new HoodieTimer();
Map<String, GenericRecord> baseFileRecords = baseFileReader.getRecordsByKeys(keys);
for (String key : keys) {
readTimer.startTimer();
if (baseFileRecords.containsKey(key)) {
hoodieRecord = getRecord(Option.of(baseFileRecords.get(key)), partitionName);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
// merge base file record w/ log record if present
if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
result.add(Pair.of(key, Option.of(new HoodieAvroRecord(hoodieRecord.getKey(), mergedPayload))));
} else {
// only base record
result.add(Pair.of(key, Option.of(hoodieRecord)));
}
} else {
// only log record
result.add(Pair.of(key, logRecords.get(key)));
}
}
if (baseFileReader == null) {
// No base file at all
timings.add(timer.endTimer());
} else {
// no base file at all
timings.add(timer.endTimer());
for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecords.entrySet()) {
result.add(Pair.of(entry.getKey(), entry.getValue()));
if (fullKeys) {
// In case full-keys (not key-prefixes) were provided, it's expected that the list of
// records will contain an (optional) entry for each corresponding key
return keys.stream()
.map(key -> Pair.of(key, logRecords.getOrDefault(key, Option.empty())))
.collect(Collectors.toList());
} else {
return logRecords.entrySet().stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}
}
return result;
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
HoodieTimer readTimer = new HoodieTimer();
readTimer.startTimer();
Map<String, HoodieRecord<HoodieMetadataPayload>> records =
fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
// Iterate over all provided log-records, merging them into existing records
for (Option<HoodieRecord<HoodieMetadataPayload>> logRecordOpt : logRecords.values()) {
if (logRecordOpt.isPresent()) {
HoodieRecord<HoodieMetadataPayload> logRecord = logRecordOpt.get();
records.merge(
logRecord.getRecordKey(),
logRecord,
(oldRecord, newRecord) ->
new HoodieAvroRecord<>(oldRecord.getKey(), newRecord.getData().preCombine(oldRecord.getData()))
);
}
}
timings.add(timer.endTimer());
if (fullKeys) {
// In case full-keys (not key-prefixes) were provided, it's expected that the list of
// records will contain an (optional) entry for each corresponding key
return keys.stream()
.map(key -> Pair.of(key, Option.ofNullable(records.get(key))))
.collect(Collectors.toList());
} else {
return records.values().stream()
.map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
.collect(Collectors.toList());
}
}
private HoodieRecord<HoodieMetadataPayload> getRecord(Option<GenericRecord> baseRecord, String partitionName) {
ValidationUtils.checkState(baseRecord.isPresent());
private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieFileReader<GenericRecord> baseFileReader,
List<String> keys,
boolean fullKeys,
String partitionName) throws IOException {
ClosableIterator<GenericRecord> records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys)
: baseFileReader.getRecordsByKeyPrefixIterator(keys);
return toStream(records)
.map(record -> Pair.of(
(String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME),
composeRecord(record, partitionName)))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) {
if (metadataTableConfig.populateMetaFields()) {
return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false);
}
return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()),
false, Option.of(partitionName));
@@ -279,34 +385,35 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
* Create a file reader and the record scanner for a given partition and file slice
* if readers are not already available.
*
* @param partitionName - Partition name
* @param slice - The file slice to open readers for
* @param partitionName - Partition name
* @param slice - The file slice to open readers for
* @return File reader and the record scanner pair for the requested file slice
*/
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String partitionName, FileSlice slice) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
try {
HoodieTimer timer = new HoodieTimer().startTimer();
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> openReaders(partitionName, slice));
}
// Open base file reader
Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
try {
HoodieTimer timer = new HoodieTimer().startTimer();
// Open base file reader
Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest file slice
List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
getLogRecordScanner(logFiles, partitionName);
HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest file slice
List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
getLogRecordScanner(logFiles, partitionName);
HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
+baseFileOpenMs + logScannerOpenMs));
return Pair.of(baseFileReader, logRecordScanner);
} catch (IOException e) {
throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
}
});
metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
+baseFileOpenMs + logScannerOpenMs));
return Pair.of(baseFileReader, logRecordScanner);
} catch (IOException e) {
throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
}
}
private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
@@ -349,7 +456,14 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return validInstantTimestamps;
}
public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles, String partitionName) {
public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
String partitionName) {
return getLogRecordScanner(logFiles, partitionName, Option.empty());
}
public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
String partitionName,
Option<Boolean> allowFullScanOverride) {
HoodieTimer timer = new HoodieTimer().startTimer();
List<String> sortedLogFilePaths = logFiles.stream()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -363,6 +477,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
boolean allowFullScan = allowFullScanOverride.orElseGet(() -> isFullScanAllowedForPartition(partitionName));
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
@@ -378,7 +494,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.withLogBlockTimestamps(validInstantTimestamps)
.enableFullScan(metadataConfig.enableFullScan())
.allowFullScan(allowFullScan)
.withPartition(partitionName)
.build();
@@ -388,6 +504,21 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return Pair.of(logRecordScanner, logScannerOpenMs);
}
// NOTE: We're allowing eager full-scan of the log-files only for "files" partition.
// Other partitions (like "column_stats", "bloom_filters") will have to be fetched
// t/h point-lookups
private boolean isFullScanAllowedForPartition(String partitionName) {
switch (partitionName) {
case PARTITION_NAME_FILES:
return metadataConfig.allowFullScan();
case PARTITION_NAME_COLUMN_STATS:
case PARTITION_NAME_BLOOM_FILTERS:
default:
return false;
}
}
/**
* Returns a list of commits which were rolled back as part of a Rollback or Restore operation.
*
@@ -433,6 +564,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private synchronized void close(Pair<String, String> partitionFileSlicePair) {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
partitionReaders.remove(partitionFileSlicePair);
closeReader(readers);
}
private void closeReader(Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers) {
if (readers != null) {
try {
if (readers.getKey() != null) {

View File

@@ -18,11 +18,13 @@
package org.apache.hudi.metadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
@@ -31,19 +33,16 @@ import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
@@ -53,38 +52,16 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
// Set of all record keys that are to be read in memory
private Set<String> mergeKeyFilter;
private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, String partitionName,
List<String> logFilePaths,
Schema readerSchema, String latestInstantTime,
Long maxMemorySizeInBytes, int bufferSize,
String spillableMapBasePath, Set<String> mergeKeyFilter,
String spillableMapBasePath,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
Option<InstantRange> instantRange, boolean enableFullScan) {
Option<InstantRange> instantRange, boolean allowFullScan) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false,
enableFullScan, Option.of(partitionName), InternalSchema.getEmptyInternalSchema());
this.mergeKeyFilter = mergeKeyFilter;
if (enableFullScan) {
performScan();
}
}
@Override
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(hoodieRecord.getRecordKey())) {
super.processNextRecord(hoodieRecord);
}
}
@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(deleteRecord.getRecordKey())) {
super.processNextDeletedRecord(deleteRecord);
}
spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, false, allowFullScan, Option.of(partitionName), InternalSchema.getEmptyInternalSchema());
}
@Override
@@ -118,24 +95,37 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
* @return {@code HoodieRecord} if key was found else {@code Option.empty()}
*/
public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordByKey(String key) {
checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key))));
}
@SuppressWarnings("unchecked")
public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
// Following operations have to be atomic, otherwise concurrent
// readers would race with each other and could crash when
// processing log block records as part of scan.
synchronized (this) {
records.clear();
scanInternal(Option.of(new KeySpec(keyPrefixes, false)));
return records.values().stream()
.filter(Objects::nonNull)
.map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
.collect(Collectors.toList());
}
}
@SuppressWarnings("unchecked")
public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys) {
// Following operations have to be atomic, otherwise concurrent
// readers would race with each other and could crash when
// processing log block records as part of scan.
records.clear();
scan(Option.of(keys));
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> metadataRecords = new ArrayList<>();
keys.forEach(entry -> {
if (records.containsKey(entry)) {
metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry))));
} else {
metadataRecords.add(Pair.of(entry, Option.empty()));
}
});
return metadataRecords;
synchronized (this) {
records.clear();
scan(keys);
return keys.stream()
.map(key -> Pair.of(key, Option.ofNullable((HoodieRecord<HoodieMetadataPayload>) records.get(key))))
.collect(Collectors.toList());
}
}
@Override
@@ -147,9 +137,7 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
* Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
*/
public static class Builder extends HoodieMergedLogRecordScanner.Builder {
private Set<String> mergeKeyFilter = Collections.emptySet();
private boolean enableFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
private boolean enableInlineReading;
private boolean allowFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
@Override
public Builder withFileSystem(FileSystem fs) {
@@ -227,26 +215,21 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
return this;
}
public Builder withMergeKeyFilter(Set<String> mergeKeyFilter) {
this.mergeKeyFilter = mergeKeyFilter;
return this;
}
public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
return this;
}
public Builder enableFullScan(boolean enableFullScan) {
this.enableFullScan = enableFullScan;
public Builder allowFullScan(boolean enableFullScan) {
this.allowFullScan = enableFullScan;
return this;
}
@Override
public HoodieMetadataMergedLogRecordReader build() {
return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter,
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableFullScan);
latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath,
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, allowFullScan);
}
}

View File

@@ -75,8 +75,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
@@ -391,7 +391,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
public Option<IndexedRecord> getInsertValue(Schema schemaIgnored, Properties propertiesIgnored) throws IOException {
if (key == null) {
return Option.empty();
}

View File

@@ -24,7 +24,9 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
@@ -159,6 +161,17 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName)
throws HoodieMetadataException;
/**
* Fetch records by key prefixes. Key prefix passed is expected to match the same prefix as stored in Metadata table partitions. For eg, in case of col stats partition,
* actual keys in metadata partition is encoded values of column name, partition name and file name. So, key prefixes passed to this method is expected to be encoded already.
*
* @param keyPrefixes list of key prefixes for which interested records are looked up for.
* @param partitionName partition name in metadata table where the records are looked up for.
* @return {@link HoodieData} of {@link HoodieRecord}s with records matching the passed in key prefixes.
*/
HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
String partitionName);
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
*/

View File

@@ -105,9 +105,9 @@ public class HoodieTableMetadataUtil {
private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class);
protected static final String PARTITION_NAME_FILES = "files";
protected static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
public static final String PARTITION_NAME_FILES = "files";
public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
/**
* Collects {@link HoodieColumnRangeMetadata} for the provided collection of records, pretending
@@ -815,7 +815,7 @@ public class HoodieTableMetadataUtil {
* @param path
* @return
*/
static String getPartition(@Nonnull String path) {
public static String getPartition(@Nonnull String path) {
return EMPTY_PARTITION_NAME.equals(path) ? NON_PARTITIONED_NAME : path;
}

View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import java.util.function.Supplier;
// TODO java-doc
public class LazyRef<T> {
private volatile boolean initialized;
private Supplier<T> initializer;
private T ref;
private LazyRef(Supplier<T> initializer) {
this.initializer = initializer;
this.ref = null;
this.initialized = false;
}
private LazyRef(T ref) {
this.initializer = null;
this.ref = ref;
this.initialized = true;
}
public T get() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
this.ref = initializer.get();
this.initializer = null;
initialized = true;
}
}
}
return ref;
}
public static <T> LazyRef<T> lazy(Supplier<T> initializer) {
return new LazyRef<>(initializer);
}
public static <T> LazyRef<T> eager(T ref) {
return new LazyRef<>(ref);
}
}

View File

@@ -2038,7 +2038,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
* Utility to convert the given iterator to a List.
*/
private static List<IndexedRecord> getRecords(HoodieDataBlock dataBlock) {
ClosableIterator<IndexedRecord> itr = dataBlock.getRecordItr();
ClosableIterator<IndexedRecord> itr = dataBlock.getRecordIterator();
List<IndexedRecord> elements = new ArrayList<>();
itr.forEachRemaining(elements::add);

View File

@@ -17,22 +17,39 @@
package org.apache.hudi
import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, tryUnpackNonNullVal}
import org.apache.hudi.metadata.{HoodieMetadataPayload, MetadataPartitionType}
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, metadataRecordSchemaString, metadataRecordStructType, tryUnpackNonNullVal}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.model.HoodieMetadataRecord
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row, SparkSession}
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
/**
* Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
* providing convenient interfaces to read it, transpose, etc
*/
trait ColumnStatsIndexSupport {
trait ColumnStatsIndexSupport extends SparkAdapterSupport {
def readColumnStatsIndex(spark: SparkSession, metadataTablePath: String): DataFrame = {
def readColumnStatsIndex(spark: SparkSession,
tableBasePath: String,
metadataConfig: HoodieMetadataConfig,
targetColumns: Seq[String] = Seq.empty): DataFrame = {
val targetColStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
@@ -43,11 +60,17 @@ trait ColumnStatsIndexSupport {
(targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
val metadataTableDF = spark.read.format("org.apache.hudi")
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
val metadataTableDF: DataFrame = {
// NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
// by only fetching Column Stats Index records pertaining to the requested columns.
// Otherwise we fallback to read whole Column Stats Index
if (targetColumns.nonEmpty) {
readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath)
} else {
readFullColumnStatsIndexInternal(spark, tableBasePath)
}
}
// TODO filter on (column, partition) prefix
val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredMetadataIndexColumns.map(col): _*)
@@ -105,34 +128,40 @@ trait ColumnStatsIndexSupport {
// of the transposed table
val sortedColumns = TreeSet(targetColumns: _*)
val transposedRDD = colStatsDF.rdd
.filter(row => sortedColumns.contains(row.getString(colStatsSchemaOrdinalsMap("columnName"))))
.map { row =>
val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("minValue")))
val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("maxValue")))
val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val colName = row.getString(colStatsSchemaOrdinalsMap("columnName"))
val transposedRDD = colStatsDF.rdd
.filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
.map { row =>
val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
val colName = row.getString(colNameOrdinal)
val colType = tableSchemaFieldMap(colName).dataType
val rowValsSeq = row.toSeq.toArray
rowValsSeq(colStatsSchemaOrdinalsMap("minValue")) = deserialize(minValue, colType)
rowValsSeq(colStatsSchemaOrdinalsMap("maxValue")) = deserialize(maxValue, colType)
rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
Row(rowValsSeq:_*)
}
.groupBy(r => r.getString(colStatsSchemaOrdinalsMap("fileName")))
.groupBy(r => r.getString(fileNameOrdinal))
.foldByKey(Seq[Row]()) {
case (_, columnRows) =>
// Rows seq is always non-empty (otherwise it won't be grouped into)
val fileName = columnRows.head.get(colStatsSchemaOrdinalsMap("fileName"))
val fileName = columnRows.head.get(fileNameOrdinal)
val coalescedRowValuesSeq = columnRows.toSeq
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout
.sortBy(_.getString(colStatsSchemaOrdinalsMap("columnName")))
.sortBy(_.getString(colNameOrdinal))
.foldLeft(Seq[Any](fileName)) {
case (acc, columnRow) =>
acc ++ Seq("minValue", "maxValue", "nullCount").map(ord => columnRow.get(colStatsSchemaOrdinalsMap(ord)))
acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord))
}
Seq(Row(coalescedRowValuesSeq:_*))
@@ -147,6 +176,49 @@ trait ColumnStatsIndexSupport {
spark.createDataFrame(transposedRDD, indexSchema)
}
private def readFullColumnStatsIndexInternal(spark: SparkSession, tableBasePath: String) = {
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
spark.read.format("org.apache.hudi")
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
}
private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
// - Fetching the records from CSI by key-prefixes (encoded column names)
// - Deserializing fetched records into [[InternalRow]]s
// - Composing [[DataFrame]]
val metadataTableDF = {
val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
// TODO encoding should be done internally w/in HoodieBackedTableMetadata
val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
HoodieJavaRDD.getJavaRDD(
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
)
val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
it.map { record =>
// schema and props are ignored for generating metadata record from the payload
// instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
toScalaOption(record.getData.getInsertValue(null, null))
.flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
.orNull
}
}
HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
}
metadataTableDF
}
}
object ColumnStatsIndexSupport {
@@ -156,6 +228,9 @@ object ColumnStatsIndexSupport {
private val COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"
private val COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"
private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
/**
* @VisibleForTesting
*/

View File

@@ -27,7 +27,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.getPartitionPath
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -68,7 +68,8 @@ case class HoodieTableState(tablePath: String,
recordKeyField: String,
preCombineFieldOpt: Option[String],
usesVirtualKeys: Boolean,
recordPayloadClassName: String)
recordPayloadClassName: String,
metadataConfig: HoodieMetadataConfig)
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -135,7 +136,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val internalSchemaFromMeta = try {
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
case _ => InternalSchema.getEmptyInternalSchema
case _: Exception => InternalSchema.getEmptyInternalSchema
}
(avroSchema, internalSchemaFromMeta)
}
@@ -339,7 +340,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
recordKeyField = recordKeyField,
preCombineFieldOpt = preCombineFieldOpt,
usesVirtualKeys = !tableConfig.populateMetaFields(),
recordPayloadClassName = tableConfig.getPayloadClass
recordPayloadClassName = tableConfig.getPayloadClass,
metadataConfig = fileIndex.metadataConfig
)
}

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
@@ -195,15 +195,14 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
val fs = metaClient.getFs
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
if (!isDataSkippingEnabled || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
if (!isDataSkippingEnabled || queryFilters.isEmpty || !HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
Option.empty
} else {
val colStatsDF: DataFrame = readColumnStatsIndex(spark, metadataTablePath)
val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns)
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(colStatsDF) {
val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)

View File

@@ -23,7 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -324,17 +324,23 @@ private object HoodieMergeOnReadRDD {
val fs = FSUtils.getFs(tablePath, hadoopConf)
if (HoodieTableMetadata.isMetadataTable(tablePath)) {
val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
val metadataConfig = tableState.metadataConfig
val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
val metadataTable = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(hadoopConf), metadataConfig,
dataTableBasePath,
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
// We have to force full-scan for the MT log record reader, to make sure
// we can iterate over all of the partitions, since by default some of the partitions (Column Stats,
// Bloom Filter) are in "point-lookup" mode
val forceFullScan = true
// NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level
// of indirection among MT partitions)
val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath)
metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath).getLeft
metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)))
.getLeft
} else {
val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)

View File

@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
@@ -360,7 +361,7 @@ object HoodieSparkSqlWriter {
None
}
} catch {
case _ => None
case _: Exception => None
}
}
@@ -568,12 +569,6 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}
def toProperties(params: Map[String, String]): TypedProperties = {
val props = new TypedProperties()
params.foreach(kv => props.setProperty(kv._1, kv._2))
props
}
private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {

View File

@@ -91,12 +91,6 @@ object HoodieWriterUtils {
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}
def toProperties(params: Map[String, String]): TypedProperties = {
val props = new TypedProperties()
params.foreach(kv => props.setProperty(kv._1, kv._2))
props
}
/**
* Get the partition columns to stored to hoodie.properties.
* @param parameters

View File

@@ -308,7 +308,7 @@ object SparkHoodieTableFileIndex {
}
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
configProperties.asScala(QUERY_TYPE.key) match {
configProperties.asScala.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT
case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.INCREMENTAL
case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.READ_OPTIMIZED

View File

@@ -154,6 +154,7 @@ class HoodieStreamSource(
} else {
// Consume the data between (startCommitTime, endCommitTime]
val incParams = parameters ++ Map(
DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startCommitTime(startOffset),
DataSourceReadOptions.END_INSTANTTIME.key -> endOffset.commitTime
)

View File

@@ -372,7 +372,11 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
// NOTE: Metadata Table has to be enabled on the read path as well
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true"
)
val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)

View File

@@ -22,11 +22,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
@@ -34,6 +34,8 @@ import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -69,8 +71,9 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
cleanupSparkContexts()
}
@Test
def testMetadataColumnStatsIndex(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = {
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -80,6 +83,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> forceFullLogScan.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)
@@ -104,9 +108,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(opts))
.build()
val colStatsDF = readColumnStatsIndex(spark, metadataTablePath)
val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
@@ -146,7 +152,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
val updatedColStatsDF = readColumnStatsIndex(spark, metadataTablePath)
val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =

View File

@@ -51,17 +51,20 @@ class TestLayoutOptimization extends HoodieClientTestBase {
.add("c7", BinaryType)
.add("c8", ByteType)
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
) ++ metadataOpts
@BeforeEach
override def setUp() {
@@ -134,6 +137,7 @@ class TestLayoutOptimization extends HoodieClientTestBase {
val readDfSkip =
spark.read
.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.options(metadataOpts)
.format("hudi")
.load(basePath)

View File

@@ -49,19 +49,21 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
def testReadability(): Unit = {
val dataGen = new HoodieTestDataGenerator()
val opts: Map[String, String] = commonOpts ++ Map(
val metadataOpts: Map[String, String] = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true"
)
val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++
Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1")
// Insert records
val newRecords = dataGen.generateInserts("001", 100)
val newRecordsDF = parseRecords(recordsToStrings(newRecords).asScala)
newRecordsDF.write.format(hudi)
.options(opts)
.options(combinedOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
@@ -71,13 +73,13 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
val updatedRecordsDF = parseRecords(recordsToStrings(updatedRecords).asScala)
updatedRecordsDF.write.format(hudi)
.options(opts)
.options(combinedOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
// Files partition of MT
val filesPartitionDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
val filesPartitionDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/files")
// Smoke test
filesPartitionDF.show()
@@ -95,7 +97,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
assertEquals(expectedKeys, keys)
// Column Stats Index partition of MT
val colStatsDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
val colStatsDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
// Smoke test
colStatsDF.show()