HUDI-528 Handle empty commit in incremental pulling (#1612)
This commit is contained in:
@@ -19,9 +19,9 @@ package org.apache.hudi
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.GlobPattern
|
import org.apache.hadoop.fs.GlobPattern
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils
|
||||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
|
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.util.ParquetUtils
|
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.exception.HoodieException
|
import org.apache.hudi.exception.HoodieException
|
||||||
import org.apache.hudi.table.HoodieTable
|
import org.apache.hudi.table.HoodieTable
|
||||||
@@ -47,8 +47,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
|
|
||||||
private val log = LogManager.getLogger(classOf[IncrementalRelation])
|
private val log = LogManager.getLogger(classOf[IncrementalRelation])
|
||||||
|
|
||||||
val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
|
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
||||||
val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
|
|
||||||
// MOR tables not supported yet
|
// MOR tables not supported yet
|
||||||
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||||
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
|
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
|
||||||
@@ -56,7 +55,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
// TODO : Figure out a valid HoodieWriteConfig
|
// TODO : Figure out a valid HoodieWriteConfig
|
||||||
private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(),
|
private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(),
|
||||||
sqlContext.sparkContext.hadoopConfiguration)
|
sqlContext.sparkContext.hadoopConfiguration)
|
||||||
val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
|
private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
|
||||||
if (commitTimeline.empty()) {
|
if (commitTimeline.empty()) {
|
||||||
throw new HoodieException("No instants to incrementally pull")
|
throw new HoodieException("No instants to incrementally pull")
|
||||||
}
|
}
|
||||||
@@ -65,25 +64,21 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
|
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
|
||||||
}
|
}
|
||||||
|
|
||||||
val lastInstant = commitTimeline.lastInstant().get()
|
private val lastInstant = commitTimeline.lastInstant().get()
|
||||||
|
|
||||||
val commitsToReturn = commitTimeline.findInstantsInRange(
|
private val commitsToReturn = commitTimeline.findInstantsInRange(
|
||||||
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
|
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
|
||||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
|
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
|
||||||
.getInstants.iterator().toList
|
.getInstants.iterator().toList
|
||||||
|
|
||||||
// use schema from a file produced in the latest instant
|
// use schema from latest metadata, if not present, read schema from the data file
|
||||||
val latestSchema = {
|
private val latestSchema = {
|
||||||
// use last instant if instant range is empty
|
val schemaUtil = new TableSchemaResolver(metaClient)
|
||||||
val instant = commitsToReturn.lastOption.getOrElse(lastInstant)
|
val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
|
||||||
val latestMeta = HoodieCommitMetadata
|
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
|
||||||
.fromBytes(commitTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata])
|
|
||||||
val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next()
|
|
||||||
AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(
|
|
||||||
sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val filters = {
|
private val filters = {
|
||||||
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
|
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
|
||||||
val filterStr = optParams.getOrElse(
|
val filterStr = optParams.getOrElse(
|
||||||
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
|
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
|
||||||
|
|||||||
@@ -126,6 +126,14 @@ class TestDataSource {
|
|||||||
assertEquals(1, countsPerCommit.length)
|
assertEquals(1, countsPerCommit.length)
|
||||||
assertEquals(firstCommit, countsPerCommit(0).get(0))
|
assertEquals(firstCommit, countsPerCommit(0).get(0))
|
||||||
|
|
||||||
|
// Upsert an empty dataFrame
|
||||||
|
val emptyRecords = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 0)).toList
|
||||||
|
val emptyDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
|
||||||
|
emptyDF.write.format("org.apache.hudi")
|
||||||
|
.options(commonOpts)
|
||||||
|
.mode(SaveMode.Append)
|
||||||
|
.save(basePath)
|
||||||
|
|
||||||
// pull the latest commit
|
// pull the latest commit
|
||||||
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
|
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
|
||||||
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
|
|||||||
Reference in New Issue
Block a user