1
0

[HUDI-2711] Fallback to fulltable scan for IncrementalRelation if underlying files have been cleared or moved by cleaner (#3946)

Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
jsbali
2022-02-01 09:33:18 +05:30
committed by GitHub
parent 4b388c104e
commit 7ce0f4522b
5 changed files with 206 additions and 27 deletions

View File

@@ -122,6 +122,10 @@ object DataSourceReadOptions {
.withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " +
"skipping over files")
val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
.withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.")
/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()

View File

@@ -18,16 +18,17 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import java.util.stream.Collectors
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hadoop.fs.GlobPattern
import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.HoodieTimer
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.table.HoodieSparkTable
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
@@ -40,11 +41,11 @@ import scala.collection.JavaConversions._
import scala.collection.mutable
/**
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
*
*/
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
@@ -85,7 +86,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val tableSchema = if (useEndInstantSchema) {
if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last)
} else {
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
@@ -165,26 +166,63 @@ class IncrementalRelation(val sqlContext: SQLContext,
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
var doFullTableScan = false
if (fallbackToFullTableScan) {
val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = new HoodieTimer().startTimer();
val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path)))
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")
val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first
if (isInstantArchived || firstNotFoundPath.isDefined) {
doFullTableScan = true
log.info("Falling back to full table scan")
}
}
if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
if (doFullTableScan) {
val hudiDF = sqlContext.read
.format("hudi")
.schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.load(basePath)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
commitsToReturn.last.getTimestamp))
// schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order
val fieldNames : Array[String] = df.schema.fields.map(field => field.name)
df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*))
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}
if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}
}
filters.foldLeft(df)((e, f) => e.filter(f)).rdd

View File

@@ -34,7 +34,8 @@ import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
@@ -708,6 +709,90 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}
@Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = {
// Create 10 commits
for (i <- 1 to 10) {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.cleaner.commits.retained", "3")
.option("hoodie.keep.min.commits", "4")
.option("hoodie.keep.max.commits", "5")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieMetadataConfig.ENABLE.key(), value = false)
.mode(SaveMode.Append)
.save(basePath)
}
val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
/**
* State of timeline after 10 commits
* +------------------+--------------------------------------+
* | Archived | Active Timeline |
* +------------------+--------------+-----------------------+
* | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 |
* +------------------+--------------+-----------------------+
* | Data cleaned | Data exists in table |
* +---------------------------------+-----------------------+
*/
val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
//Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files
var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4
var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5
//Calling without the fallback should result in Path does not exist
var hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)
val msg = "Should fail with Path does not exist"
assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)
//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(100, hoodieIncViewDF.count())
//Test out for archived commits
val archivedInstants = hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray
startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
endTs = completedCommits.nthInstant(1).get().getTimestamp //C5
//Calling without the fallback should result in Path does not exist
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)
assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)
//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(500, hoodieIncViewDF.count())
}
def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = {
val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))

View File

@@ -135,7 +135,10 @@ public class HoodieIncrSource extends RowSource {
DataFrameReader reader = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()));
Dataset<Row> source = reader.load(srcPath);

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.functional;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -1739,6 +1740,54 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
}
@Test
public void testHoodieIncrFallback() throws Exception {
String tableBasePath = dfsBasePath + "/incr_test_table";
String downstreamTableBasePath = dfsBasePath + "/incr_test_downstream_table";
insertInTable(tableBasePath, 1, WriteOperationType.BULK_INSERT);
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true, null);
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
//No change as this fails with Path not exist error
assertThrows(org.apache.spark.sql.AnalysisException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync());
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*", sqlContext);
if (downstreamCfg.configs == null) {
downstreamCfg.configs = new ArrayList<>();
}
downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true");
//Adding this conf to make testing easier :)
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
downstreamCfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
long baseTableRecords = sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").count();
long downStreamTableRecords = sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath + "/*/*.parquet").count();
assertEquals(baseTableRecords, downStreamTableRecords);
}
private void insertInTable(String tableBasePath, int count, WriteOperationType operationType) throws Exception {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, operationType,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false);
if (cfg.configs == null) {
cfg.configs = new ArrayList<>();
}
cfg.configs.add("hoodie.cleaner.commits.retained=3");
cfg.configs.add("hoodie.keep.min.commits=4");
cfg.configs.add("hoodie.keep.max.commits=5");
cfg.configs.add("hoodie.test.source.generate.inserts=true");
for (int i = 0; i < count; i++) {
new HoodieDeltaStreamer(cfg, jsc).sync();
}
}
@Test
public void testInsertOverwrite() throws Exception {
testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE);