diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 3e6f7e4b4..8a68d3e33 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -456,6 +456,12 @@ object DataSourceWriteOptions { .defaultValue("true") .withDocumentation("") + // Create table as managed table + val HIVE_CREATE_MANAGED_TABLE: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.datasource.hive_sync.create_managed_table") + .defaultValue(false) + .withDocumentation("Whether to sync the table as managed table.") + // Async Compaction - Enabled by default for MOR val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.compaction.async.enable") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index fd70a42dd..cecb9dec7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -439,8 +439,8 @@ object HoodieSparkSqlWriter { serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp) - } + hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE) hiveSyncConfig } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index d3239e0dd..586e91685 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -73,6 +73,8 @@ object HoodieWriterUtils { HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.defaultValue, HIVE_STYLE_PARTITIONING_OPT_KEY.key -> HIVE_STYLE_PARTITIONING_OPT_KEY.defaultValue, HIVE_USE_JDBC_OPT_KEY.key -> HIVE_USE_JDBC_OPT_KEY.defaultValue, + HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString, + HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(), ASYNC_COMPACT_ENABLE_OPT_KEY.key -> ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue, ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue ) ++ DataSourceOptionsHelper.translateConfigurations(parameters) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index bdddb1c6d..64eff9ad7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -69,6 +70,9 @@ case class CreateHoodieTableAsSelectCommand( // Execute the insert query try { + // Set if sync as a managed table. + sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key(), + (table.tableType == CatalogTableType.MANAGED).toString) val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty, mode == SaveMode.Overwrite, refreshTable = false) if (success) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index a9691c412..19d2a08ce 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -542,7 +542,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { "path" -> basePath, DataSourceWriteOptions.TABLE_NAME_OPT_KEY.key -> "test_hoodie", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY.key -> "partition", - DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true" + DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true", + DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> "true" ) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) @@ -559,6 +560,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] assertTrue(hiveSyncConfig.skipROSuffix) + assertTrue(hiveSyncConfig.createManagedTable) assertResult("spark.sql.sources.provider=hudi\n" + "spark.sql.sources.schema.partCol.0=partition\n" + "spark.sql.sources.schema.numParts=1\n" + diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index c085aa0e7..3d4cc5b20 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -104,6 +104,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing") public Boolean decodePartition = false; + @Parameter(names = {"--managed-table"}, description = "Create a managed table") + public Boolean createManagedTable = false; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -123,6 +126,7 @@ public class HiveSyncConfig implements Serializable { newConfig.decodePartition = cfg.decodePartition; newConfig.tableProperties = cfg.tableProperties; newConfig.serdeProperties = cfg.serdeProperties; + newConfig.createManagedTable = cfg.createManagedTable; return newConfig; } @@ -151,6 +155,7 @@ public class HiveSyncConfig implements Serializable { + ", help=" + help + ", supportTimestamp=" + supportTimestamp + ", decodePartition=" + decodePartition + + ", createManagedTable=" + createManagedTable + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 6eea71644..7af54bb05 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -413,7 +413,12 @@ public class HiveSchemaUtil { } String partitionsStr = String.join(",", partitionFields); - StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS "); + StringBuilder sb = new StringBuilder(); + if (config.createManagedTable) { + sb.append("CREATE TABLE IF NOT EXISTS "); + } else { + sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS "); + } sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER) .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER); sb.append("( ").append(columns).append(")"); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 09bf7563e..f0e171b4b 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -66,6 +66,10 @@ public class TestHiveSyncTool { return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}}); } + private static Iterable useJdbcAndSchemaFromCommitMetadataAndManagedTable() { + return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}}); + } + @BeforeEach public void setUp() throws Exception { HiveTestUtil.setUp(); @@ -269,6 +273,38 @@ public class TestHiveSyncTool { String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'")); + assertTrue(ddl.toLowerCase().contains("create external table")); + } + } + + @ParameterizedTest + @MethodSource({"useJdbcAndSchemaFromCommitMetadataAndManagedTable"}) + public void testSyncManagedTable(boolean useJdbc, + boolean useSchemaFromCommitMetadata, + boolean isManagedTable) throws Exception { + HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; + + hiveSyncConfig.useJdbc = useJdbc; + hiveSyncConfig.createManagedTable = isManagedTable; + String instantTime = "100"; + HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); + + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + SessionState.start(HiveTestUtil.getHiveConf()); + Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf()); + String dbTableName = hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName; + hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName); + + List results = new ArrayList<>(); + hiveDriver.run("SHOW CREATE TABLE " + dbTableName); + hiveDriver.getResults(results); + String ddl = String.join("\n", results).toLowerCase(); + if (isManagedTable) { + assertTrue(ddl.contains("create table")); + } else { + assertTrue(ddl.contains("create external table")); } }