[HUDI-2058]support incremental query for insert_overwrite_table/insert_overwrite operation on cow table (#3139)
This commit is contained in:
@@ -17,13 +17,16 @@
|
|||||||
|
|
||||||
package org.apache.hudi
|
package org.apache.hudi
|
||||||
|
|
||||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
|
import java.util.stream.Collectors
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
|
||||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline
|
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.exception.HoodieException
|
import org.apache.hudi.exception.HoodieException
|
||||||
import org.apache.hadoop.fs.GlobPattern
|
import org.apache.hadoop.fs.GlobPattern
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||||
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
import org.apache.hudi.table.HoodieSparkTable
|
import org.apache.hudi.table.HoodieSparkTable
|
||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
@@ -68,10 +71,10 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
|
|
||||||
private val lastInstant = commitTimeline.lastInstant().get()
|
private val lastInstant = commitTimeline.lastInstant().get()
|
||||||
|
|
||||||
private val commitsToReturn = commitTimeline.findInstantsInRange(
|
private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
|
||||||
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key),
|
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key),
|
||||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, lastInstant.getTimestamp))
|
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key(), lastInstant.getTimestamp))
|
||||||
.getInstants.iterator().toList
|
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
|
||||||
|
|
||||||
// use schema from a file produced in the end/latest instant
|
// use schema from a file produced in the end/latest instant
|
||||||
val usedSchema: StructType = {
|
val usedSchema: StructType = {
|
||||||
@@ -96,14 +99,31 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
val regularFileIdToFullPath = mutable.HashMap[String, String]()
|
val regularFileIdToFullPath = mutable.HashMap[String, String]()
|
||||||
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
|
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
|
||||||
|
|
||||||
|
// create Replaced file group
|
||||||
|
val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
|
||||||
|
val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
|
||||||
|
val replaceMetadata = HoodieReplaceCommitMetadata.
|
||||||
|
fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
|
||||||
|
replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
|
||||||
|
entry.getValue.map { e =>
|
||||||
|
val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
|
||||||
|
(e, fullPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.toMap
|
||||||
|
|
||||||
for (commit <- commitsToReturn) {
|
for (commit <- commitsToReturn) {
|
||||||
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
|
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
|
||||||
.get, classOf[HoodieCommitMetadata])
|
.get, classOf[HoodieCommitMetadata])
|
||||||
|
|
||||||
if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
|
if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
|
||||||
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
|
||||||
|
replacedFile.contains(k) && v.startsWith(replacedFile(k))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
|
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
|
||||||
|
replacedFile.contains(k) && v.startsWith(replacedFile(k))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ package org.apache.hudi.functional
|
|||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.{Collections, Date, UUID}
|
import java.util.{Collections, Date, UUID}
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hudi.DataSourceWriteOptions._
|
import org.apache.hudi.DataSourceWriteOptions._
|
||||||
@@ -33,9 +32,10 @@ import org.apache.hudi.exception.HoodieException
|
|||||||
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
|
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
|
||||||
import org.apache.hudi.hive.HiveSyncConfig
|
import org.apache.hudi.hive.HiveSyncConfig
|
||||||
import org.apache.hudi.testutils.DataSourceTestUtils
|
import org.apache.hudi.testutils.DataSourceTestUtils
|
||||||
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
|
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
|
import org.apache.spark.sql.functions.{expr, lit}
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
|
||||||
@@ -630,4 +630,74 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
|
|||||||
sc.setLogLevel("ERROR")
|
sc.setLogLevel("ERROR")
|
||||||
sqlContext = spark.sqlContext
|
sqlContext = spark.sqlContext
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("test Incremental View WithReplacement") {
|
||||||
|
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType =>
|
||||||
|
initSparkContext("testNonPartitionTableWithMetaTable")
|
||||||
|
initSparkContext("test_schema_evolution")
|
||||||
|
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
|
||||||
|
val bootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
|
||||||
|
val basePath = path.toAbsolutePath.toString
|
||||||
|
val baseBootStrapPath = bootStrapPath.toAbsolutePath.toString
|
||||||
|
val options = Map(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> tableType,
|
||||||
|
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key -> "col3",
|
||||||
|
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key -> "keyid",
|
||||||
|
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key -> "",
|
||||||
|
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
|
||||||
|
HoodieWriteConfig.TABLE_NAME.key -> "hoodie_test")
|
||||||
|
try {
|
||||||
|
val df = spark.range(0, 1000).toDF("keyid")
|
||||||
|
.withColumn("col3", expr("keyid"))
|
||||||
|
.withColumn("age", lit(1))
|
||||||
|
.withColumn("p", lit(2))
|
||||||
|
|
||||||
|
df.write.format("hudi")
|
||||||
|
.options(options)
|
||||||
|
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert")
|
||||||
|
.option("hoodie.insert.shuffle.parallelism", "4")
|
||||||
|
.mode(SaveMode.Overwrite).save(basePath)
|
||||||
|
|
||||||
|
df.write.format("hudi")
|
||||||
|
.options(options)
|
||||||
|
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert_overwrite_table")
|
||||||
|
.option("hoodie.insert.shuffle.parallelism", "4")
|
||||||
|
.mode(SaveMode.Append).save(basePath)
|
||||||
|
|
||||||
|
val currentCommits = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
|
||||||
|
val incrementalKeyIdNum = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "0000")
|
||||||
|
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, currentCommits(0))
|
||||||
|
.load(basePath).select("keyid").orderBy("keyid").count
|
||||||
|
assert(incrementalKeyIdNum == 1000)
|
||||||
|
|
||||||
|
// add bootstap test
|
||||||
|
df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
|
||||||
|
// boostrap table
|
||||||
|
spark.emptyDataFrame.write.format("hudi")
|
||||||
|
.options(options)
|
||||||
|
.option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, baseBootStrapPath)
|
||||||
|
.option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[NonpartitionedKeyGenerator].getCanonicalName)
|
||||||
|
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
|
||||||
|
.option(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key, "4")
|
||||||
|
.mode(SaveMode.Overwrite).save(basePath)
|
||||||
|
|
||||||
|
df.write.format("hudi")
|
||||||
|
.options(options)
|
||||||
|
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert_overwrite_table")
|
||||||
|
.option("hoodie.insert.shuffle.parallelism", "4")
|
||||||
|
.mode(SaveMode.Append).save(basePath)
|
||||||
|
|
||||||
|
val currentCommitsBootstrap = spark.read.format("hudi").load(basePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
|
||||||
|
val incrementalKeyIdNumBootstrap = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key, "0000")
|
||||||
|
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY.key, currentCommitsBootstrap(0))
|
||||||
|
.load(basePath).select("keyid").orderBy("keyid").count
|
||||||
|
assert(incrementalKeyIdNumBootstrap == 1000)
|
||||||
|
} finally {
|
||||||
|
spark.stop()
|
||||||
|
FileUtils.deleteDirectory(path.toFile)
|
||||||
|
FileUtils.deleteDirectory(bootStrapPath.toFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user