1
0

[HUDI-3446] Supports batch reader in BootstrapOperator#loadRecords (#4837)

* [HUDI-3446] Supports batch Reader in BootstrapOperator#loadRecords
This commit is contained in:
Bo Cui
2022-02-19 21:21:48 +08:00
committed by GitHub
parent f15125c0cd
commit 83279971a1
8 changed files with 197 additions and 126 deletions

View File

@@ -57,9 +57,9 @@ public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, O
BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getPath()); BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getPath());
List<HoodieKey> hoodieKeyList = new ArrayList<>(); List<HoodieKey> hoodieKeyList = new ArrayList<>();
if (keyGeneratorOpt.isPresent()) { if (keyGeneratorOpt.isPresent()) {
hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt); hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()), keyGeneratorOpt);
} else { } else {
hoodieKeyList = baseFileUtils.fetchRecordKeyPartitionPath(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())); hoodieKeyList = baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new Path(baseFile.getPath()));
} }
return hoodieKeyList.stream() return hoodieKeyList.stream()
.map(entry -> Pair.of(entry, .map(entry -> Pair.of(entry,

View File

@@ -167,18 +167,35 @@ public abstract class BaseFileUtils {
* Fetch {@link HoodieKey}s from the given data file. * Fetch {@link HoodieKey}s from the given data file.
* @param configuration configuration to build fs object * @param configuration configuration to build fs object
* @param filePath The data file path * @param filePath The data file path
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file * @return {@link List} of {@link HoodieKey}s fetched from the data file
*/ */
public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath);
/**
* Provides a closable iterator for reading the given data file.
* @param configuration configuration to build fs object
* @param filePath The data file path
* @param keyGeneratorOpt instance of KeyGenerator.
* @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file
*/
public abstract ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
/**
* Provides a closable iterator for reading the given data file.
* @param configuration configuration to build fs object
* @param filePath The data file path
* @return {@link ClosableIterator} of {@link HoodieKey}s for reading the file
*/
public abstract ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath);
/** /**
* Fetch {@link HoodieKey}s from the given data file. * Fetch {@link HoodieKey}s from the given data file.
* @param configuration configuration to build fs object * @param configuration configuration to build fs object
* @param filePath The data file path * @param filePath The data file path
* @param keyGeneratorOpt instance of KeyGenerator. * @param keyGeneratorOpt instance of KeyGenerator.
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file * @return {@link List} of {@link HoodieKey}s fetched from the data file
*/ */
public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt); public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
/** /**
* Read the Avro schema of the data file. * Read the Avro schema of the data file.

View File

@@ -29,19 +29,18 @@ import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription; import org.apache.orc.TypeDescription;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
/** /**
* This class wraps a ORC reader and provides an iterator based api to read from an ORC file. * This class wraps a ORC reader and provides an iterator based api to read from an ORC file.
*/ */
public class OrcReaderIterator<T> implements Iterator<T> { public class OrcReaderIterator<T> implements ClosableIterator<T> {
private final RecordReader recordReader; private final RecordReader recordReader;
private final Schema avroSchema; private final Schema avroSchema;
List<String> fieldNames; private final List<String> fieldNames;
List<TypeDescription> orcFieldTypes; private final List<TypeDescription> orcFieldTypes;
Schema[] avroFieldSchemas; private final Schema[] avroFieldSchemas;
private VectorizedRowBatch batch; private final VectorizedRowBatch batch;
private int rowInBatch; private int rowInBatch;
private T next; private T next;
@@ -52,7 +51,7 @@ public class OrcReaderIterator<T> implements Iterator<T> {
this.orcFieldTypes = orcSchema.getChildren(); this.orcFieldTypes = orcSchema.getChildren();
this.avroFieldSchemas = fieldNames.stream() this.avroFieldSchemas = fieldNames.stream()
.map(fieldName -> avroSchema.getField(fieldName).schema()) .map(fieldName -> avroSchema.getField(fieldName).schema())
.toArray(size -> new Schema[size]); .toArray(Schema[]::new);
this.batch = orcSchema.createRowBatch(); this.batch = orcSchema.createRowBatch();
this.rowInBatch = 0; this.rowInBatch = 0;
} }
@@ -115,4 +114,9 @@ public class OrcReaderIterator<T> implements Iterator<T> {
rowInBatch++; rowInBatch++;
return record; return record;
} }
@Override
public void close() {
FileIOUtils.closeQuietly(this.recordReader);
}
} }

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.common.util;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@@ -55,29 +56,23 @@ import org.apache.orc.TypeDescription;
public class OrcUtils extends BaseFileUtils { public class OrcUtils extends BaseFileUtils {
/** /**
* Fetch {@link HoodieKey}s from the given ORC file. * Provides a closable iterator for reading the given ORC file.
* *
* @param filePath The ORC file path.
* @param configuration configuration to build fs object * @param configuration configuration to build fs object
* @return {@link List} of {@link HoodieKey}s fetched from the ORC file * @param filePath The ORC file path
* @return {@link ClosableIterator} of {@link HoodieKey}s for reading the ORC file
*/ */
@Override @Override
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath) {
List<HoodieKey> hoodieKeys = new ArrayList<>();
try { try {
if (!filePath.getFileSystem(configuration).exists(filePath)) {
return new ArrayList<>();
}
Configuration conf = new Configuration(configuration); Configuration conf = new Configuration(configuration);
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema);
List<String> fieldNames = orcSchema.getFieldNames();
VectorizedRowBatch batch = orcSchema.createRowBatch();
RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
List<String> fieldNames = orcSchema.getFieldNames();
// column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields
int keyCol = -1; int keyCol = -1;
@@ -93,24 +88,43 @@ public class OrcUtils extends BaseFileUtils {
if (keyCol == -1 || partitionCol == -1) { if (keyCol == -1 || partitionCol == -1) {
throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath)); throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath));
} }
while (recordReader.nextBatch(batch)) { return new OrcReaderIterator<>(recordReader, readSchema, orcSchema);
BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[keyCol]; } catch (IOException e) {
BytesColumnVector partitionPaths = (BytesColumnVector) batch.cols[partitionCol]; throw new HoodieIOException("Failed to open reader from ORC file:" + filePath, e);
for (int i = 0; i < batch.size; i++) { }
String rowKey = rowKeys.toString(i); }
String partitionPath = partitionPaths.toString(i);
hoodieKeys.add(new HoodieKey(rowKey, partitionPath)); /**
} * Fetch {@link HoodieKey}s from the given ORC file.
*
* @param filePath The ORC file path.
* @param configuration configuration to build fs object
* @return {@link List} of {@link HoodieKey}s fetched from the ORC file
*/
@Override
public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath) {
try {
if (!filePath.getFileSystem(configuration).exists(filePath)) {
return Collections.emptyList();
} }
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); throw new HoodieIOException("Failed to read from ORC file:" + filePath, e);
} }
List<HoodieKey> hoodieKeys = new ArrayList<>();
try (ClosableIterator<HoodieKey> iterator = getHoodieKeyIterator(configuration, filePath, Option.empty())) {
iterator.forEachRemaining(hoodieKeys::add);
}
return hoodieKeys; return hoodieKeys;
} }
@Override @Override
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) { public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new HoodieIOException("UnsupportedOperation : Disabling meta fields not yet supported for Orc"); throw new UnsupportedOperationException("Custom key generator is not supported yet");
}
@Override
public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new UnsupportedOperationException("Custom key generator is not supported yet");
} }
/** /**
@@ -119,8 +133,7 @@ public class OrcUtils extends BaseFileUtils {
@Override @Override
public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) { public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) {
Schema avroSchema; Schema avroSchema;
try { try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) {
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration));
avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema()); avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema());
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Unable to read Avro records from an ORC file:" + filePath, io); throw new HoodieIOException("Unable to read Avro records from an ORC file:" + filePath, io);
@@ -134,14 +147,14 @@ public class OrcUtils extends BaseFileUtils {
@Override @Override
public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) { public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) {
List<GenericRecord> records = new ArrayList<>(); List<GenericRecord> records = new ArrayList<>();
try { try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration))) {
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration));
TypeDescription orcSchema = reader.getSchema(); TypeDescription orcSchema = reader.getSchema();
RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema)); try (RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema))) {
OrcReaderIterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); OrcReaderIterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema);
while (iterator.hasNext()) { while (iterator.hasNext()) {
GenericRecord record = iterator.next(); GenericRecord record = iterator.next();
records.add(record); records.add(record);
}
} }
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io); throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io);
@@ -161,35 +174,35 @@ public class OrcUtils extends BaseFileUtils {
@Override @Override
public Set<String> filterRowKeys(Configuration conf, Path filePath, Set<String> filter) public Set<String> filterRowKeys(Configuration conf, Path filePath, Set<String> filter)
throws HoodieIOException { throws HoodieIOException {
try { try (Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));) {
Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf));
Set<String> filteredRowKeys = new HashSet<>();
TypeDescription schema = reader.getSchema(); TypeDescription schema = reader.getSchema();
List<String> fieldNames = schema.getFieldNames(); try (RecordReader recordReader = reader.rows(new Options(conf).schema(schema))) {
VectorizedRowBatch batch = schema.createRowBatch(); Set<String> filteredRowKeys = new HashSet<>();
RecordReader recordReader = reader.rows(new Options(conf).schema(schema)); List<String> fieldNames = schema.getFieldNames();
VectorizedRowBatch batch = schema.createRowBatch();
// column index for the RECORD_KEY_METADATA_FIELD field // column index for the RECORD_KEY_METADATA_FIELD field
int colIndex = -1; int colIndex = -1;
for (int i = 0; i < fieldNames.size(); i++) { for (int i = 0; i < fieldNames.size(); i++) {
if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
colIndex = i; colIndex = i;
break; break;
}
}
if (colIndex == -1) {
throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath));
}
while (recordReader.nextBatch(batch)) {
BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex];
for (int i = 0; i < batch.size; i++) {
String rowKey = rowKeys.toString(i);
if (filter.isEmpty() || filter.contains(rowKey)) {
filteredRowKeys.add(rowKey);
} }
} }
if (colIndex == -1) {
throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath));
}
while (recordReader.nextBatch(batch)) {
BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex];
for (int i = 0; i < batch.size; i++) {
String rowKey = rowKeys.toString(i);
if (filter.isEmpty() || filter.contains(rowKey)) {
filteredRowKeys.add(rowKey);
}
}
}
return filteredRowKeys;
} }
return filteredRowKeys;
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io); throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io);
} }
@@ -198,8 +211,7 @@ public class OrcUtils extends BaseFileUtils {
@Override @Override
public Map<String, String> readFooter(Configuration conf, boolean required, public Map<String, String> readFooter(Configuration conf, boolean required,
Path orcFilePath, String... footerNames) { Path orcFilePath, String... footerNames) {
try { try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
Map<String, String> footerVals = new HashMap<>(); Map<String, String> footerVals = new HashMap<>();
List<UserMetadataItem> metadataItemList = reader.getFileTail().getFooter().getMetadataList(); List<UserMetadataItem> metadataItemList = reader.getFileTail().getFooter().getMetadataList();
Map<String, String> metadata = metadataItemList.stream().collect(Collectors.toMap( Map<String, String> metadata = metadataItemList.stream().collect(Collectors.toMap(
@@ -221,8 +233,7 @@ public class OrcUtils extends BaseFileUtils {
@Override @Override
public Schema readAvroSchema(Configuration conf, Path orcFilePath) { public Schema readAvroSchema(Configuration conf, Path orcFilePath) {
try { try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
if (reader.hasMetadataValue("orc.avro.schema")) { if (reader.hasMetadataValue("orc.avro.schema")) {
ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema"); ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema");
byte[] bytes = new byte[metadataValue.remaining()]; byte[] bytes = new byte[metadataValue.remaining()];
@@ -239,8 +250,7 @@ public class OrcUtils extends BaseFileUtils {
@Override @Override
public long getRowCount(Configuration conf, Path orcFilePath) { public long getRowCount(Configuration conf, Path orcFilePath) {
try { try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) {
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
return reader.getNumberOfRows(); return reader.getNumberOfRows();
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io); throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io);

View File

@@ -24,13 +24,12 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
/** /**
* This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in * This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in
* {@link BoundedInMemoryQueue} * {@link BoundedInMemoryQueue}
*/ */
public class ParquetReaderIterator<T> implements Iterator<T> { public class ParquetReaderIterator<T> implements ClosableIterator<T> {
// Parquet reader for an existing parquet file // Parquet reader for an existing parquet file
private final ParquetReader<T> parquetReader; private final ParquetReader<T> parquetReader;
@@ -73,7 +72,11 @@ public class ParquetReaderIterator<T> implements Iterator<T> {
} }
} }
public void close() throws IOException { public void close() {
parquetReader.close(); try {
parquetReader.close();
} catch (IOException e) {
throw new HoodieException("Exception while closing the parquet reader", e);
}
} }
} }

View File

@@ -128,12 +128,26 @@ public class ParquetUtils extends BaseFileUtils {
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
*/ */
@Override @Override
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath) {
return fetchRecordKeyPartitionPathInternal(configuration, filePath, Option.empty()); return fetchHoodieKeys(configuration, filePath, Option.empty());
} }
private List<HoodieKey> fetchRecordKeyPartitionPathInternal(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) { @Override
List<HoodieKey> hoodieKeys = new ArrayList<>(); public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath) {
return getHoodieKeyIterator(configuration, filePath, Option.empty());
}
/**
* Returns a closable iterator for reading the given parquet file.
*
* @param configuration configuration to build fs object
* @param filePath The parquet file path
* @param keyGeneratorOpt instance of KeyGenerator
*
* @return {@link ClosableIterator} of {@link HoodieKey}s for reading the parquet file
*/
@Override
public ClosableIterator<HoodieKey> getHoodieKeyIterator(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
try { try {
Configuration conf = new Configuration(configuration); Configuration conf = new Configuration(configuration);
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
@@ -146,27 +160,11 @@ public class ParquetUtils extends BaseFileUtils {
.orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema()); .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema());
AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setAvroReadSchema(conf, readSchema);
AvroReadSupport.setRequestedProjection(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema);
ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build(); ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(filePath).withConf(conf).build();
Object obj = reader.read(); return HoodieKeyIterator.getInstance(new ParquetReaderIterator<>(reader), keyGeneratorOpt);
while (obj != null) {
if (obj instanceof GenericRecord) {
String recordKey = null;
String partitionPath = null;
if (keyGeneratorOpt.isPresent()) {
recordKey = keyGeneratorOpt.get().getRecordKey((GenericRecord) obj);
partitionPath = keyGeneratorOpt.get().getPartitionPath((GenericRecord) obj);
} else {
recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
}
hoodieKeys.add(new HoodieKey(recordKey, partitionPath));
obj = reader.read();
}
}
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Failed to read from Parquet file " + filePath, e); throw new HoodieIOException("Failed to read from Parquet file " + filePath, e);
} }
return hoodieKeys;
} }
/** /**
@@ -174,12 +172,16 @@ public class ParquetUtils extends BaseFileUtils {
* *
* @param configuration configuration to build fs object * @param configuration configuration to build fs object
* @param filePath The parquet file path. * @param filePath The parquet file path.
* @param keyGeneratorOpt * @param keyGeneratorOpt instance of KeyGenerator.
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
*/ */
@Override @Override
public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) { public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
return fetchRecordKeyPartitionPathInternal(configuration, filePath, keyGeneratorOpt); List<HoodieKey> hoodieKeys = new ArrayList<>();
try (ClosableIterator<HoodieKey> iterator = getHoodieKeyIterator(configuration, filePath, keyGeneratorOpt)) {
iterator.forEachRemaining(hoodieKeys::add);
return hoodieKeys;
}
} }
public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
@@ -228,10 +230,8 @@ public class ParquetUtils extends BaseFileUtils {
*/ */
@Override @Override
public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) { public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) {
ParquetReader reader = null;
List<GenericRecord> records = new ArrayList<>(); List<GenericRecord> records = new ArrayList<>();
try { try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(configuration).build()) {
reader = AvroParquetReader.builder(filePath).withConf(configuration).build();
Object obj = reader.read(); Object obj = reader.read();
while (obj != null) { while (obj != null) {
if (obj instanceof GenericRecord) { if (obj instanceof GenericRecord) {
@@ -242,14 +242,6 @@ public class ParquetUtils extends BaseFileUtils {
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Failed to read avro records from Parquet " + filePath, e); throw new HoodieIOException("Failed to read avro records from Parquet " + filePath, e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
// ignore
}
}
} }
return records; return records;
} }
@@ -424,4 +416,55 @@ public class ParquetUtils extends BaseFileUtils {
throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName())); throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName()));
} }
} }
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* An iterator that can apply the given function {@code func} to transform records
* from the underneath record iterator to hoodie keys.
*/
private static class HoodieKeyIterator implements ClosableIterator<HoodieKey> {
private final ClosableIterator<GenericRecord> nestedItr;
private final Function<GenericRecord, HoodieKey> func;
public static HoodieKeyIterator getInstance(ClosableIterator<GenericRecord> nestedItr, Option<BaseKeyGenerator> keyGenerator) {
return new HoodieKeyIterator(nestedItr, keyGenerator);
}
private HoodieKeyIterator(ClosableIterator<GenericRecord> nestedItr, Option<BaseKeyGenerator> keyGenerator) {
this.nestedItr = nestedItr;
if (keyGenerator.isPresent()) {
this.func = retVal -> {
String recordKey = keyGenerator.get().getRecordKey(retVal);
String partitionPath = keyGenerator.get().getPartitionPath(retVal);
return new HoodieKey(recordKey, partitionPath);
};
} else {
this.func = retVal -> {
String recordKey = retVal.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partitionPath = retVal.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
return new HoodieKey(recordKey, partitionPath);
};
}
}
@Override
public void close() {
if (this.nestedItr != null) {
this.nestedItr.close();
}
}
@Override
public boolean hasNext() {
return this.nestedItr.hasNext();
}
@Override
public HoodieKey next() {
return this.func.apply(this.nestedItr.next());
}
}
} }

View File

@@ -147,7 +147,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
// Read and verify // Read and verify
List<HoodieKey> fetchedRows = List<HoodieKey> fetchedRows =
parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath));
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match");
for (HoodieKey entry : fetchedRows) { for (HoodieKey entry : fetchedRows) {
@@ -173,7 +173,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
// Read and verify // Read and verify
List<HoodieKey> fetchedRows = List<HoodieKey> fetchedRows =
parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath),
Option.of(new TestBaseKeyGen("abc","def"))); Option.of(new TestBaseKeyGen("abc","def")));
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match");

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
@@ -210,17 +211,10 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
if (!isValidFile(baseFile.getFileStatus())) { if (!isValidFile(baseFile.getFileStatus())) {
return; return;
} }
try (ClosableIterator<HoodieKey> iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) {
final List<HoodieKey> hoodieKeys; iterator.forEachRemaining(hoodieKey -> {
try { output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
hoodieKeys = });
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
}
for (HoodieKey hoodieKey : hoodieKeys) {
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
} }
}); });