diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 13d87c048..7324a5ca5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -635,9 +635,37 @@ object HoodieSparkSqlWriter { SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, fs.getConf, fs, basePath.toString, baseFileFormat) }) } + + // Since Hive tables are now synced as Spark data source tables which are cached after Spark SQL queries + // we must invalidate this table in the cache so writes are reflected in later queries + if (metaSyncEnabled) { + getHiveTableNames(hoodieConfig).foreach(name => { + val qualifiedTableName = String.join(".", hoodieConfig.getStringOrDefault(HIVE_DATABASE), name) + if (spark.catalog.tableExists(qualifiedTableName)) { + spark.catalog.refreshTable(qualifiedTableName) + } + }) + } true } + private def getHiveTableNames(hoodieConfig: HoodieConfig): List[String] = { + val tableName = hoodieConfig.getStringOrDefault(HIVE_TABLE) + val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE) + + if (tableType.equals(COW_TABLE_TYPE_OPT_VAL)) { + List(tableName) + } else { + val roSuffix = if (hoodieConfig.getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE)) { + "" + } else { + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE + } + List(tableName + roSuffix, + tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE) + } + } + /** * Group all table/action specific information into a case class. */