1
0

[HUDI-2057] CTAS Generate An External Table When Create Managed Table (#3146)

This commit is contained in:
pengzhiwei
2021-07-03 15:55:36 +08:00
committed by GitHub
parent 7173d1338a
commit 4f215e2938
8 changed files with 63 additions and 3 deletions

View File

@@ -456,6 +456,12 @@ object DataSourceWriteOptions {
.defaultValue("true") .defaultValue("true")
.withDocumentation("") .withDocumentation("")
// Create table as managed table
val HIVE_CREATE_MANAGED_TABLE: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.datasource.hive_sync.create_managed_table")
.defaultValue(false)
.withDocumentation("Whether to sync the table as managed table.")
// Async Compaction - Enabled by default for MOR // Async Compaction - Enabled by default for MOR
val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.compaction.async.enable") .key("hoodie.datasource.compaction.async.enable")

View File

@@ -439,8 +439,8 @@ object HoodieSparkSqlWriter {
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp) hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
} }
hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
hiveSyncConfig hiveSyncConfig
} }

View File

@@ -73,6 +73,8 @@ object HoodieWriterUtils {
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.defaultValue, HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.key -> HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.defaultValue,
HIVE_STYLE_PARTITIONING_OPT_KEY.key -> HIVE_STYLE_PARTITIONING_OPT_KEY.defaultValue, HIVE_STYLE_PARTITIONING_OPT_KEY.key -> HIVE_STYLE_PARTITIONING_OPT_KEY.defaultValue,
HIVE_USE_JDBC_OPT_KEY.key -> HIVE_USE_JDBC_OPT_KEY.defaultValue, HIVE_USE_JDBC_OPT_KEY.key -> HIVE_USE_JDBC_OPT_KEY.defaultValue,
HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
ASYNC_COMPACT_ENABLE_OPT_KEY.key -> ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue, ASYNC_COMPACT_ENABLE_OPT_KEY.key -> ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue,
ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters) ) ++ DataSourceOptionsHelper.translateConfigurations(parameters)

View File

@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -69,6 +70,9 @@ case class CreateHoodieTableAsSelectCommand(
// Execute the insert query // Execute the insert query
try { try {
// Set if sync as a managed table.
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key(),
(table.tableType == CatalogTableType.MANAGED).toString)
val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty, val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty,
mode == SaveMode.Overwrite, refreshTable = false) mode == SaveMode.Overwrite, refreshTable = false)
if (success) { if (success) {

View File

@@ -542,7 +542,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
"path" -> basePath, "path" -> basePath,
DataSourceWriteOptions.TABLE_NAME_OPT_KEY.key -> "test_hoodie", DataSourceWriteOptions.TABLE_NAME_OPT_KEY.key -> "test_hoodie",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY.key -> "partition", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY.key -> "partition",
DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true" DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX.key -> "true",
DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> "true"
) )
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters) val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
@@ -559,6 +560,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig] new Path(basePath), newHoodieConfig).asInstanceOf[HiveSyncConfig]
assertTrue(hiveSyncConfig.skipROSuffix) assertTrue(hiveSyncConfig.skipROSuffix)
assertTrue(hiveSyncConfig.createManagedTable)
assertResult("spark.sql.sources.provider=hudi\n" + assertResult("spark.sql.sources.provider=hudi\n" +
"spark.sql.sources.schema.partCol.0=partition\n" + "spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" + "spark.sql.sources.schema.numParts=1\n" +

View File

@@ -104,6 +104,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing") @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
public Boolean decodePartition = false; public Boolean decodePartition = false;
@Parameter(names = {"--managed-table"}, description = "Create a managed table")
public Boolean createManagedTable = false;
// enhance the similar function in child class // enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) { public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig(); HiveSyncConfig newConfig = new HiveSyncConfig();
@@ -123,6 +126,7 @@ public class HiveSyncConfig implements Serializable {
newConfig.decodePartition = cfg.decodePartition; newConfig.decodePartition = cfg.decodePartition;
newConfig.tableProperties = cfg.tableProperties; newConfig.tableProperties = cfg.tableProperties;
newConfig.serdeProperties = cfg.serdeProperties; newConfig.serdeProperties = cfg.serdeProperties;
newConfig.createManagedTable = cfg.createManagedTable;
return newConfig; return newConfig;
} }
@@ -151,6 +155,7 @@ public class HiveSyncConfig implements Serializable {
+ ", help=" + help + ", help=" + help
+ ", supportTimestamp=" + supportTimestamp + ", supportTimestamp=" + supportTimestamp
+ ", decodePartition=" + decodePartition + ", decodePartition=" + decodePartition
+ ", createManagedTable=" + createManagedTable
+ '}'; + '}';
} }
} }

View File

@@ -413,7 +413,12 @@ public class HiveSchemaUtil {
} }
String partitionsStr = String.join(",", partitionFields); String partitionsStr = String.join(",", partitionFields);
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS "); StringBuilder sb = new StringBuilder();
if (config.createManagedTable) {
sb.append("CREATE TABLE IF NOT EXISTS ");
} else {
sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS ");
}
sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER) sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER); .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
sb.append("( ").append(columns).append(")"); sb.append("( ").append(columns).append(")");

View File

@@ -66,6 +66,10 @@ public class TestHiveSyncTool {
return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}}); return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}});
} }
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadataAndManagedTable() {
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}});
}
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
HiveTestUtil.setUp(); HiveTestUtil.setUp();
@@ -269,6 +273,38 @@ public class TestHiveSyncTool {
String ddl = String.join("\n", results); String ddl = String.join("\n", results);
assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'")); assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'"));
assertTrue(ddl.toLowerCase().contains("create external table"));
}
}
@ParameterizedTest
@MethodSource({"useJdbcAndSchemaFromCommitMetadataAndManagedTable"})
public void testSyncManagedTable(boolean useJdbc,
boolean useSchemaFromCommitMetadata,
boolean isManagedTable) throws Exception {
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
hiveSyncConfig.useJdbc = useJdbc;
hiveSyncConfig.createManagedTable = isManagedTable;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
SessionState.start(HiveTestUtil.getHiveConf());
Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf());
String dbTableName = hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName;
hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName);
List<String> results = new ArrayList<>();
hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
hiveDriver.getResults(results);
String ddl = String.join("\n", results).toLowerCase();
if (isManagedTable) {
assertTrue(ddl.contains("create table"));
} else {
assertTrue(ddl.contains("create external table"));
} }
} }