From a5348cc6854e7b35b008fb922868d3b9b2a714f9 Mon Sep 17 00:00:00 2001 From: Rahil C <32500120+rahil-c@users.noreply.github.com> Date: Fri, 22 Jul 2022 22:47:47 -0700 Subject: [PATCH] [HUDI-4436] Invalidate cached table in Spark after write (#6159) Co-authored-by: Ryan Pifer --- .../apache/hudi/HoodieSparkSqlWriter.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 13d87c048..7324a5ca5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -635,9 +635,37 @@ object HoodieSparkSqlWriter { SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, fs, basePath.toString, baseFileFormat) }) } + + // Since Hive tables are now synced as Spark data source tables which are cached after Spark SQL queries + // we must invalidate this table in the cache so writes are reflected in later queries + if (metaSyncEnabled) { + getHiveTableNames(hoodieConfig).foreach(name => { + val qualifiedTableName = String.join(".", hoodieConfig.getStringOrDefault(HIVE_DATABASE), name) + if (spark.catalog.tableExists(qualifiedTableName)) { + spark.catalog.refreshTable(qualifiedTableName) + } + }) + } true } + private def getHiveTableNames(hoodieConfig: HoodieConfig): List[String] = { + val tableName = hoodieConfig.getStringOrDefault(HIVE_TABLE) + val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE) + + if (tableType.equals(COW_TABLE_TYPE_OPT_VAL)) { + List(tableName) + } else { + val roSuffix = if (hoodieConfig.getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE)) { + "" + } else { + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE + } + List(tableName + roSuffix, + tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE) + } + } + /** * Group all table/action specific information into a case class. */