1
0

[HUDI-3895] Fixing file-partitioning seq for base-file only views to make sure we bucket the files efficiently (#5337)

This commit is contained in:
Alexey Kudinkin
2022-04-18 13:06:52 -07:00
committed by GitHub
parent 1718bcab84
commit 7ecb47cd21

View File

@@ -84,21 +84,24 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters)
val fileSplits = partitions.values.toSeq.flatMap { files => val fileSplits = partitions.values.toSeq
files.flatMap { file => .flatMap { files =>
// TODO move to adapter files.flatMap { file =>
// TODO fix, currently assuming parquet as underlying format // TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles( HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession, sparkSession = sparkSession,
file = file, file = file,
// TODO clarify why this is required partitionValues = getPartitionColumnsAsInternalRow(file)
partitionValues = getPartitionColumnsAsInternalRow(file) )
) }
} }
} // NOTE: It's important to order the splits in the reverse order of their
// size so that we can subsequently bucket them in an efficient manner
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes).map(HoodieBaseFileSplit.apply) sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
.map(HoodieBaseFileSplit.apply)
} }
} }