diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 4643da506..89faa3bbb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.HiveSyncTool import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor -import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.log4j.LogManager diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f992a9702..5d6ebd620 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -437,7 +437,14 @@ object HoodieSparkSqlWriter { DataSourceWriteOptions.DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean if (syncAsDtaSourceTable) { hiveSyncConfig.tableProperties = parameters.getOrElse(HIVE_TABLE_PROPERTIES, null) - hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, basePath.toString) + val serdePropText = createSqlTableSerdeProperties(parameters, basePath.toString) + val serdeProp = ConfigUtils.toMap(serdePropText) + serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY) + serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + + hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp) + } hiveSyncConfig } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 10141fb23..cbae43a20 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -570,8 +570,10 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { "{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties) - - assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties) + assertResult("path=/tmp/hoodie_test\n" + + "spark.query.type.key=hoodie.datasource.query.type\n" + + "spark.query.as.rt.key=snapshot\n" + + "spark.query.as.ro.key=read_optimized")(hiveSyncConfig.serdeProperties) } test("Test build sync config for skip Ro Suffix vals") { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 18d133b6a..0dbe97f9b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -106,13 +106,13 @@ public class HiveSyncTool extends AbstractSyncTool { if (hoodieHiveClient != null) { switch (hoodieHiveClient.getTableType()) { case COPY_ON_WRITE: - syncHoodieTable(snapshotTableName, false); + syncHoodieTable(snapshotTableName, false, false); break; case MERGE_ON_READ: // sync a RO table for MOR - syncHoodieTable(roTableName.get(), false); + syncHoodieTable(roTableName.get(), false, true); // sync a RT table for MOR - syncHoodieTable(snapshotTableName, true); + syncHoodieTable(snapshotTableName, true, false); break; default: LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); @@ -128,7 +128,8 @@ public class HiveSyncTool extends AbstractSyncTool { } } - private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) { + private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, + boolean readAsOptimized) { LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath() + " of type " + hoodieHiveClient.getTableType()); @@ -152,7 +153,7 @@ public class HiveSyncTool extends AbstractSyncTool { // Get the parquet schema for this table looking at the latest commit MessageType schema = hoodieHiveClient.getDataSchema(); // Sync schema if needed - syncSchema(tableName, tableExists, useRealtimeInputFormat, schema); + syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); LOG.info("Schema sync complete. Syncing partitions for " + tableName); // Get the last time we successfully synced partitions @@ -177,7 +178,8 @@ public class HiveSyncTool extends AbstractSyncTool { * @param tableExists - does table exist * @param schema - extracted schema */ - private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) { + private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, + boolean readAsOptimized, MessageType schema) { // Check and sync schema if (!tableExists) { LOG.info("Hive table " + tableName + " is not found. Creating it"); @@ -194,11 +196,27 @@ public class HiveSyncTool extends AbstractSyncTool { String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat); String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); + Map serdeProperties = ConfigUtils.toMap(cfg.serdeProperties); + + // The serdeProperties is non-empty only for spark sync meta data currently. + if (!serdeProperties.isEmpty()) { + String queryTypeKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_TYPE_KEY); + String queryAsROKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RO_KEY); + String queryAsRTKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RT_KEY); + + if (queryTypeKey != null && queryAsROKey != null && queryAsRTKey != null) { + if (readAsOptimized) { // read optimized + serdeProperties.put(queryTypeKey, queryAsROKey); + } else { // read snapshot + serdeProperties.put(queryTypeKey, queryAsRTKey); + } + } + } // Custom serde will not work with ALTER TABLE REPLACE COLUMNS // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive // /ql/exec/DDLTask.java#L3488 hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, - outputFormatClassName, serDeFormatClassName, ConfigUtils.toMap(cfg.serdeProperties), ConfigUtils.toMap(cfg.tableProperties)); + outputFormatClassName, serDeFormatClassName, serdeProperties, ConfigUtils.toMap(cfg.tableProperties)); } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java index 8c9dfb636..b8745b6e3 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java @@ -24,6 +24,12 @@ import org.apache.hudi.common.util.StringUtils; public class ConfigUtils { + public static final String SPARK_QUERY_TYPE_KEY = "spark.query.type.key"; + + public static final String SPARK_QUERY_AS_RO_KEY = "spark.query.as.ro.key"; + + public static final String SPARK_QUERY_AS_RT_KEY = "spark.query.as.rt.key"; + /** * Convert the key-value config to a map.The format of the config * is a key-value pair just like "k1=v1\nk2=v2\nk3=v3". diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 118ad121a..4324a64f7 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -264,11 +264,15 @@ public class TestHiveSyncTool { @ParameterizedTest @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) - public void testSyncWithProperties(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { + public void testSyncCOWTableWithProperties(boolean useJdbc, + boolean useSchemaFromCommitMetadata) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); + put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); + put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); + put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); } }; @@ -304,10 +308,79 @@ public class TestHiveSyncTool { assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); results.clear(); + // validate serde properties hiveDriver.run("SHOW CREATE TABLE " + dbTableName); hiveDriver.getResults(results); String ddl = String.join("\n", results); assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); + assertTrue(ddl.contains("'hoodie.datasource.query.type'='snapshot'")); + } + + @ParameterizedTest + @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) + public void testSyncMORTableWithProperties(boolean useJdbc, + boolean useSchemaFromCommitMetadata) throws Exception { + HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; + Map serdeProperties = new HashMap() { + { + put("path", hiveSyncConfig.basePath); + put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type"); + put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized"); + put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot"); + } + }; + + Map tableProperties = new HashMap() { + { + put("tp_0", "p0"); + put("tp_1", "p1"); + } + }; + hiveSyncConfig.useJdbc = useJdbc; + hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); + hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); + String instantTime = "100"; + String deltaCommitTime = "101"; + HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, + useSchemaFromCommitMetadata); + + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + String roTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; + String rtTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; + + String[] tableNames = new String[] {roTableName, rtTableName}; + String[] expectQueryTypes = new String[] {"read_optimized", "snapshot"}; + + SessionState.start(HiveTestUtil.getHiveConf()); + Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf()); + + for (int i = 0;i < 2; i++) { + String dbTableName = hiveSyncConfig.databaseName + "." + tableNames[i]; + String expectQueryType = expectQueryTypes[i]; + + hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName); + List results = new ArrayList<>(); + hiveDriver.getResults(results); + + String tblPropertiesWithoutDdlTime = String.join("\n", + results.subList(0, results.size() - 1)); + assertEquals( + "EXTERNAL\tTRUE\n" + + "last_commit_time_sync\t101\n" + + "tp_0\tp0\n" + + "tp_1\tp1", tblPropertiesWithoutDdlTime); + assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); + + results.clear(); + // validate serde properties + hiveDriver.run("SHOW CREATE TABLE " + dbTableName); + hiveDriver.getResults(results); + String ddl = String.join("\n", results); + assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); + assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'")); + } } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 74f28ccda..1d6bfb442 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -188,7 +188,8 @@ public class HiveTestUtil { DateTime dateTime = DateTime.now(); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, useSchemaFromCommitMetadata, dateTime, commitTime); - createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createdTablesSet + .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); createdTablesSet .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();