diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 0a12c1d34..af0d7a6c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -17,13 +17,16 @@ package org.apache.hudi -import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType} +import java.util.stream.Collectors + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hadoop.fs.GlobPattern import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.table.HoodieSparkTable import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext @@ -68,10 +71,10 @@ class IncrementalRelation(val sqlContext: SQLContext, private val lastInstant = commitTimeline.lastInstant().get() - private val commitsToReturn = commitTimeline.findInstantsInRange( + private val commitsTimelineToReturn = commitTimeline.findInstantsInRange( optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key), - optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, lastInstant.getTimestamp)) - .getInstants.iterator().toList + optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key(), lastInstant.getTimestamp)) + private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList // use schema from a file produced in the end/latest instant val usedSchema: StructType = { @@ -96,14 +99,31 @@ class IncrementalRelation(val sqlContext: SQLContext, val regularFileIdToFullPath = mutable.HashMap[String, String]() var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]() + // create Replaced file group + val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline + val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant => + val replaceMetadata = HoodieReplaceCommitMetadata. + fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata]) + replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry => + entry.getValue.map { e => + val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString + (e, fullPath) + } + } + }.toMap + for (commit <- commitsToReturn) { val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) .get, classOf[HoodieCommitMetadata]) if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) { - metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap + metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => + replacedFile.contains(k) && v.startsWith(replacedFile(k)) + } } else { - regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap + regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => + replacedFile.contains(k) && v.startsWith(replacedFile(k)) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 19d2a08ce..44be4d160 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -20,7 +20,6 @@ package org.apache.hudi.functional import java.time.Instant import java.util import java.util.{Collections, Date, UUID} - import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ @@ -33,9 +32,10 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} @@ -630,4 +630,74 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { sc.setLogLevel("ERROR") sqlContext = spark.sqlContext } + + test("test Incremental View WithReplacement") { + List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType => + initSparkContext("testNonPartitionTableWithMetaTable") + initSparkContext("test_schema_evolution") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + val bootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap") + val basePath = path.toAbsolutePath.toString + val baseBootStrapPath = bootStrapPath.toAbsolutePath.toString + val options = Map(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> tableType, + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key -> "col3", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key -> "keyid", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key -> "", + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", + HoodieWriteConfig.TABLE_NAME.key -> "hoodie_test") + try { + val df = spark.range(0, 1000).toDF("keyid") + .withColumn("col3", expr("keyid")) + .withColumn("age", lit(1)) + .withColumn("p", lit(2)) + + df.write.format("hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert") + .option("hoodie.insert.shuffle.parallelism", "4") + .mode(SaveMode.Overwrite).save(basePath) + + df.write.format("hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert_overwrite_table") + .option("hoodie.insert.shuffle.parallelism", "4") + .mode(SaveMode.Append).save(basePath) + + val currentCommits = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) + val incrementalKeyIdNum = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "0000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, currentCommits(0)) + .load(basePath).select("keyid").orderBy("keyid").count + assert(incrementalKeyIdNum == 1000) + + // add bootstap test + df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath) + // boostrap table + spark.emptyDataFrame.write.format("hudi") + .options(options) + .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, baseBootStrapPath) + .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[NonpartitionedKeyGenerator].getCanonicalName) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key, "4") + .mode(SaveMode.Overwrite).save(basePath) + + df.write.format("hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert_overwrite_table") + .option("hoodie.insert.shuffle.parallelism", "4") + .mode(SaveMode.Append).save(basePath) + + val currentCommitsBootstrap = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) + val incrementalKeyIdNumBootstrap = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "0000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, currentCommitsBootstrap(0)) + .load(basePath).select("keyid").orderBy("keyid").count + assert(incrementalKeyIdNumBootstrap == 1000) + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + FileUtils.deleteDirectory(bootStrapPath.toFile) + } + } + } }