1
0

[HUDI-1850][HUDI-3234] Fixing read of a empty table but with failed write (#2903)

This commit is contained in:
Sivabalan Narayanan
2022-01-23 14:23:21 -05:00
committed by GitHub
parent e72553accf
commit f7a77961e3
2 changed files with 72 additions and 18 deletions

View File

@@ -110,29 +110,32 @@ class DefaultSource extends RelationProvider
val queryType = parameters(QUERY_TYPE.key)
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, metaClient)
} else {
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath,
readPaths, metaClient)
(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
(MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath,
readPaths, metaClient)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, schema, metaClient)
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new IncrementalRelation(sqlContext, parameters, schema, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient)
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient)
case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters)
case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters)
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
s"isBootstrappedTable: $isBootstrappedTable ")
case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
s"isBootstrappedTable: $isBootstrappedTable ")
}
}
}

View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
import scala.util.control.NonFatal
/**
* BaseRelation representing empty RDD.
* @param sqlContext instance of SqlContext.
*/
class EmptyRelation(val sqlContext: SQLContext, metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan {
override def schema: StructType = {
// do the best to find the table schema.
val schemaResolver = new TableSchemaResolver(metaClient)
try {
val avroSchema = schemaResolver.getTableAvroSchema
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
} catch {
case NonFatal(e) =>
StructType(Nil)
}
}
override def buildScan(): RDD[Row] = {
sqlContext.sparkContext.emptyRDD[Row]
}
}