1
0

[HUDI-3724] Fixing closure of ParquetReader (#5141)

This commit is contained in:
Sivabalan Narayanan
2022-03-27 18:36:15 -07:00
committed by GitHub
parent 9da2dd416e
commit f2a93ead3b
2 changed files with 18 additions and 2 deletions

View File

@@ -19,7 +19,9 @@
package org.apache.hudi.io.storage; package org.apache.hudi.io.storage;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@@ -30,14 +32,17 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetReader;
public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader<R> { public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader<R> {
private final Path path; private final Path path;
private final Configuration conf; private final Configuration conf;
private final BaseFileUtils parquetUtils; private final BaseFileUtils parquetUtils;
private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
public HoodieParquetReader(Configuration configuration, Path path) { public HoodieParquetReader(Configuration configuration, Path path) {
this.conf = configuration; this.conf = configuration;
@@ -64,7 +69,9 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
public Iterator<R> getRecordIterator(Schema schema) throws IOException { public Iterator<R> getRecordIterator(Schema schema) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, schema); AvroReadSupport.setAvroReadSchema(conf, schema);
ParquetReader<R> reader = AvroParquetReader.<R>builder(path).withConf(conf).build(); ParquetReader<R> reader = AvroParquetReader.<R>builder(path).withConf(conf).build();
return new ParquetReaderIterator<>(reader); ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
} }
@Override @Override
@@ -74,6 +81,7 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
@Override @Override
public void close() { public void close() {
readerIterators.forEach(entry -> entry.close());
} }
@Override @Override

View File

@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.spark.TaskContext
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
@@ -47,6 +48,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import java.io.Closeable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try import scala.util.Try
@@ -333,7 +335,13 @@ object HoodieBaseRelation {
partitionedFile => { partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath) val extension = FSUtils.getFileExtension(partitionedFile.filePath)
if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) { if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
parquetReader.apply(partitionedFile) val iter = parquetReader.apply(partitionedFile)
if (iter.isInstanceOf[Closeable]) {
// register a callback to close parquetReader which will be executed on task completion.
// when tasks finished, this method will be called, and release resources.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close()))
}
iter
} else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) { } else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) {
hfileReader.apply(partitionedFile) hfileReader.apply(partitionedFile)
} else { } else {