1
0

[HUDI-4205] Fix NullPointerException in HFile reader creation (#5841)

Replace SerializableConfiguration with SerializableWritable for broadcasting the hadoop configuration before initializing HFile readers
This commit is contained in:
Y Ethan Guo
2022-06-11 14:46:43 -07:00
committed by GitHub
parent 97ccf5dd18
commit fd8f7c5f6c

View File

@@ -34,9 +34,10 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.SerializableWritable
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
@@ -535,11 +536,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
filters: Seq[Filter], filters: Seq[Filter],
options: Map[String, String], options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hadoopConfBroadcast = val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf))
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
partitionedFile => { partitionedFile => {
val hadoopConf = hadoopConfBroadcast.value.get() val hadoopConf = hadoopConfBroadcast.value.value
val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath),
new CacheConfig(hadoopConf)) new CacheConfig(hadoopConf))