From 7ecb47cd21a2c387e46d600aea628f0ded27e302 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 18 Apr 2022 13:06:52 -0700 Subject: [PATCH] [HUDI-3895] Fixing file-partitioning seq for base-file only views to make sure we bucket the files efficiently (#5337) --- .../apache/hudi/BaseFileOnlyRelation.scala | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 525292da6..f46b31b03 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -84,21 +84,24 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) - val fileSplits = partitions.values.toSeq.flatMap { files => - files.flatMap { file => - // TODO move to adapter - // TODO fix, currently assuming parquet as underlying format - HoodieDataSourceHelper.splitFiles( - sparkSession = sparkSession, - file = file, - // TODO clarify why this is required - partitionValues = getPartitionColumnsAsInternalRow(file) - ) + val fileSplits = partitions.values.toSeq + .flatMap { files => + files.flatMap { file => + // TODO fix, currently assuming parquet as underlying format + HoodieDataSourceHelper.splitFiles( + sparkSession = sparkSession, + file = file, + partitionValues = getPartitionColumnsAsInternalRow(file) + ) + } } - } + // NOTE: It's important to order the splits in the reverse order of their + // size so that we can subsequently bucket them in an efficient manner + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes).map(HoodieBaseFileSplit.apply) + sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes) + .map(HoodieBaseFileSplit.apply) } }