[HUDI-4436] Invalidate cached table in Spark after write (#6159)
Co-authored-by: Ryan Pifer <rmpifer@umich.edu>
This commit is contained in:
@@ -635,9 +635,37 @@ object HoodieSparkSqlWriter {
|
||||
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, fs, basePath.toString, baseFileFormat)
|
||||
})
|
||||
}
|
||||
|
||||
// Since Hive tables are now synced as Spark data source tables which are cached after Spark SQL queries
|
||||
// we must invalidate this table in the cache so writes are reflected in later queries
|
||||
if (metaSyncEnabled) {
|
||||
getHiveTableNames(hoodieConfig).foreach(name => {
|
||||
val qualifiedTableName = String.join(".", hoodieConfig.getStringOrDefault(HIVE_DATABASE), name)
|
||||
if (spark.catalog.tableExists(qualifiedTableName)) {
|
||||
spark.catalog.refreshTable(qualifiedTableName)
|
||||
}
|
||||
})
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
private def getHiveTableNames(hoodieConfig: HoodieConfig): List[String] = {
|
||||
val tableName = hoodieConfig.getStringOrDefault(HIVE_TABLE)
|
||||
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
|
||||
|
||||
if (tableType.equals(COW_TABLE_TYPE_OPT_VAL)) {
|
||||
List(tableName)
|
||||
} else {
|
||||
val roSuffix = if (hoodieConfig.getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE)) {
|
||||
""
|
||||
} else {
|
||||
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE
|
||||
}
|
||||
List(tableName + roSuffix,
|
||||
tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group all table/action specific information into a case class.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user