From ca440ccf881c67c308e72beaf6a561e12e1b4da2 Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Mon, 12 Jul 2021 17:31:23 +0800 Subject: [PATCH] [HUDI-2107] Support Read Log Only MOR Table For Spark (#3193) --- .../utils/HoodieRealtimeInputFormatUtils.java | 37 +++----- .../org/apache/hudi/HoodieFileIndex.scala | 40 ++++---- .../hudi/MergeOnReadSnapshotRelation.scala | 55 +++++------ .../hudi/testutils/DataSourceTestUtils.java | 29 ++++++ .../hudi/functional/TestMORDataSource.scala | 33 ++++++- .../sql/hudi/TestMereIntoLogOnlyTable.scala | 91 +++++++++++++++++++ 6 files changed, 214 insertions(+), 71 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index a7fbf66f9..22d30899c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; @@ -49,11 +50,13 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -137,16 +140,14 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { } // Return parquet file with a list of log files in the same file group. - public static Map> groupLogsByBaseFile(Configuration conf, List fileStatuses) { - Map> partitionsToParquetSplits = - fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent())); + public static List, List>> groupLogsByBaseFile(Configuration conf, List partitionPaths) { + Set partitionSet = new HashSet<>(partitionPaths); // TODO(vc): Should we handle also non-hoodie splits here? - Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet()); + Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); - // for all unique split parents, obtain all delta files based on delta commit timeline, - // grouped on file id - Map> resultMap = new HashMap<>(); - partitionsToParquetSplits.keySet().forEach(partitionPath -> { + // Get all the base file and it's log files pairs in required partition paths. + List, List>> baseAndLogsList = new ArrayList<>(); + partitionSet.forEach(partitionPath -> { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); @@ -161,28 +162,18 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { .map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) .orElse(Stream.empty()); - // subgroup splits again by file id & match with log files. - Map> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() - .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getFileStatus().getPath().getName()))); latestFileSlices.forEach(fileSlice -> { - List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); - dataFileSplits.forEach(split -> { - try { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); - resultMap.put(split, logFilePaths); - } catch (Exception e) { - throw new HoodieException("Error creating hoodie real time split ", e); - } - }); + baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths)); }); } catch (Exception e) { throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e); } }); - return resultMap; + return baseAndLogsList; } - + /** * Add a field to the existing fields projected. diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index ca2bf72cf..3aa41eb8e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -106,6 +106,12 @@ case class HoodieFileIndex( } } + private lazy val metadataConfig = { + val properties = new Properties() + properties.putAll(options.asJava) + HoodieMetadataConfig.newBuilder.fromProperties(properties).build() + } + @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _ @transient @volatile private var cachedFileSize: Long = 0L @@ -195,8 +201,8 @@ case class HoodieFileIndex( * @param predicates The filter condition. * @return The Pruned partition paths. */ - private def prunePartition(partitionPaths: Seq[PartitionRowPath], - predicates: Seq[Expression]): Seq[PartitionRowPath] = { + def prunePartition(partitionPaths: Seq[PartitionRowPath], + predicates: Seq[Expression]): Seq[PartitionRowPath] = { val partitionColumnNames = partitionSchema.fields.map(_.name).toSet val partitionPruningPredicates = predicates.filter { @@ -222,26 +228,13 @@ case class HoodieFileIndex( } } - /** - * Load all partition paths and it's files under the query table path. - */ - private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { + def getAllQueryPartitionPaths: Seq[PartitionRowPath] = { val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - val properties = new Properties() - properties.putAll(options.asJava) - val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build() - val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) // Load all the partition path from the basePath, and filter by the query partition path. // TODO load files from the queryPartitionPath directly. val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala .filter(_.startsWith(queryPartitionPath)) - - val writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath).withProperties(properties).build() - val maxListParallelism = writeConfig.getFileListingParallelism - - val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) val partitionSchema = _partitionSchemaFromProperties val timeZoneId = CaseInsensitiveMap(options) .get(DateTimeUtils.TIMEZONE_OPTION) @@ -250,7 +243,7 @@ case class HoodieFileIndex( val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark .sessionState.conf) // Convert partition path to PartitionRowPath - val partitionRowPaths = partitionPaths.map { partitionPath => + partitionPaths.map { partitionPath => val partitionRow = if (partitionSchema.fields.length == 0) { // This is a non-partitioned table InternalRow.empty @@ -308,7 +301,20 @@ case class HoodieFileIndex( } PartitionRowPath(partitionRow, partitionPath) } + } + /** + * Load all partition paths and it's files under the query table path. + */ + private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { + val properties = new Properties() + properties.putAll(options.asJava) + val writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath).withProperties(properties).build() + + val maxListParallelism = writeConfig.getFileListingParallelism + val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) + val partitionRowPaths = getAllQueryPartitionPaths // List files in all of the partition path. val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index f20bb4d25..381df1132 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -19,7 +19,6 @@ package org.apache.hudi import org.apache.avro.Schema -import org.apache.hudi.common.model.HoodieBaseFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils @@ -137,12 +136,15 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, } def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = { - - val fileStatuses = if (globPaths.isDefined) { + // Get all partition paths + val partitionPaths = if (globPaths.isDefined) { // Load files from the global paths if it has defined to be compatible with the original mode val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get) - inMemoryFileIndex.allFiles() - } else { // Load files by the HoodieFileIndex. + val fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants, inMemoryFileIndex.allFiles().toArray) + fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent) + } else { // Load partition path by the HoodieFileIndex. val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) @@ -152,34 +154,35 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val partitionFilterExpression = HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema) - // if convert success to catalyst expression, use the partition prune - if (partitionFilterExpression.isDefined) { - hoodieFileIndex.listFiles(Seq(partitionFilterExpression.get), Seq.empty).flatMap(_.files) - } else { - hoodieFileIndex.allFiles - } + val allPartitionPaths = hoodieFileIndex.getAllQueryPartitionPaths + // If convert success to catalyst expression, use the partition prune + hoodieFileIndex.prunePartition(allPartitionPaths, partitionFilterExpression.map(Seq(_)).getOrElse(Seq.empty)) + .map(_.fullPartitionPath(metaClient.getBasePath)) } - if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list. + if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list. List.empty[HoodieMergeOnReadFileSplit] } else { - val fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants, fileStatuses.toArray) - val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList - - if (!fsView.getLastInstant.isPresent) { // Return empty list if the table has no commit + val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant() + if (!lastInstant.isPresent) { // Return empty list if the table has no commit List.empty } else { - val latestCommit = fsView.getLastInstant.get().getTimestamp - val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala - val fileSplits = fileGroup.map(kv => { - val baseFile = kv._1 - val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) - val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath) + val latestCommit = lastInstant.get().getTimestamp + val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala + val fileSplits = baseAndLogsList.map(kv => { + val baseFile = kv.getLeft + val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList) - val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, baseFile.getFileLen) - HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit, + val baseDataPath = if (baseFile.isPresent) { + Some(PartitionedFile( + InternalRow.empty, + MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath), + 0, baseFile.get.getFileLen) + ) + } else { + None + } + HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList fileSplits diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index bdb19bd3d..ecc48b8ae 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -18,6 +18,12 @@ package org.apache.hudi.testutils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.FileIOUtils; import org.apache.avro.Schema; @@ -118,4 +124,27 @@ public class DataSourceTestUtils { } return rows; } + + /** + * Test if there is only log files exists in the table. + */ + public static boolean isLogFileOnly(String basePath) throws IOException { + Configuration conf = new Configuration(); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(conf).setBasePath(basePath) + .build(); + String baseDataFormat = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + Path path = new Path(basePath); + FileSystem fs = path.getFileSystem(conf); + RemoteIterator files = fs.listFiles(path, true); + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + if (file.isFile()) { + if (file.getPath().toString().endsWith(baseDataFormat)) { + return false; + } + } + } + return true; + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 5574c01fb..1028eefd2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,20 +17,24 @@ package org.apache.hudi.functional +import org.apache.hadoop.fs.Path + import scala.collection.JavaConverters._ import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.DefaultHoodieRecordPayload +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator -import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} @@ -677,4 +681,23 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(partitionCounts("2021/03/03"), count7) } + + @Test + def testReadLogOnlyMergeOnReadTable(): Unit = { + initMetaClient(HoodieTableType.MERGE_ON_READ) + val records1 = dataGen.generateInsertsContainsAllPartitions("000", 20) + val inputDF = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + // Use InMemoryIndex to generate log only mor table. + .option(HoodieIndexConfig.INDEX_TYPE_PROP.key, IndexType.INMEMORY.toString) + .mode(SaveMode.Overwrite) + .save(basePath) + // There should no base file in the file list. + assertTrue(DataSourceTestUtils.isLogFileOnly(basePath)) + // Test read log only mor table. + assertEquals(20, spark.read.format("hudi").load(basePath).count()) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala new file mode 100644 index 000000000..668f0f960 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMereIntoLogOnlyTable.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.testutils.DataSourceTestUtils + +class TestMereIntoLogOnlyTable extends TestHoodieSqlBase { + + test("Test Query Log Only MOR Table") { + withTempDir { tmp => + // Create table with INMEMORY index to generate log only mor table. + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, 'a1' as name, 11 as price, 1001 as ts + | ) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1001), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, 'a4' as name, 11 as price, 1000 as ts + | ) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 11.0, 1001), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4", 11.0, 1000) + ) + } + } +}