diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index e58f5e1f5..8dd40adf1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -81,10 +81,10 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable { .withDocumentation("Field used in preCombining before actual write. By default, when two records have the same key value, " + "the largest value for the precombine field determined by Object.compareTo(..), is picked."); - public static final ConfigProperty HOODIE_TABLE_PARTITION_COLUMNS_PROP = ConfigProperty - .key("hoodie.table.partition.columns") + public static final ConfigProperty HOODIE_TABLE_PARTITION_FIELDS_PROP = ConfigProperty + .key("hoodie.table.partition.fields") .noDefaultValue() - .withDocumentation("Columns used to partition the table. Concatenated values of these fields are used as " + .withDocumentation("Fields used to partition the table. Concatenated values of these fields are used as " + "the partition path, by invoking toString()"); public static final ConfigProperty HOODIE_TABLE_RECORDKEY_FIELDS = ConfigProperty @@ -250,9 +250,17 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable { return getString(HOODIE_TABLE_PRECOMBINE_FIELD_PROP); } - public Option getPartitionColumns() { - if (contains(HOODIE_TABLE_PARTITION_COLUMNS_PROP)) { - return Option.of(Arrays.stream(getString(HOODIE_TABLE_PARTITION_COLUMNS_PROP).split(",")) + public Option getRecordKeyFields() { + if (contains(HOODIE_TABLE_RECORDKEY_FIELDS)) { + return Option.of(Arrays.stream(getString(HOODIE_TABLE_RECORDKEY_FIELDS).split(",")) + .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[]{})); + } + return Option.empty(); + } + + public Option getPartitionFields() { + if (contains(HOODIE_TABLE_PARTITION_FIELDS_PROP)) { + return Option.of(Arrays.stream(getString(HOODIE_TABLE_PARTITION_FIELDS_PROP).split(",")) .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[]{})); } return Option.empty(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index e07810931..01dd04082 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -619,7 +619,7 @@ public class HoodieTableMetaClient implements Serializable { private Integer timelineLayoutVersion; private String baseFileFormat; private String preCombineField; - private String partitionColumns; + private String partitionFields; private String bootstrapIndexClass; private String bootstrapBasePath; private Boolean populateMetaFields; @@ -681,8 +681,8 @@ public class HoodieTableMetaClient implements Serializable { return this; } - public PropertyBuilder setPartitionColumns(String partitionColumns) { - this.partitionColumns = partitionColumns; + public PropertyBuilder setPartitionFields(String partitionFields) { + this.partitionFields = partitionFields; return this; } @@ -741,9 +741,9 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP)) { setPreCombineField(hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP)); } - if (hoodieConfig.contains(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS_PROP)) { - setPartitionColumns( - hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS_PROP)); + if (hoodieConfig.contains(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP)) { + setPartitionFields( + hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP)); } if (hoodieConfig.contains(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS)) { setRecordKeyFields(hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS)); @@ -801,8 +801,8 @@ public class HoodieTableMetaClient implements Serializable { tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP, preCombineField); } - if (null != partitionColumns) { - tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS_PROP, partitionColumns); + if (null != partitionFields) { + tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP, partitionFields); } if (null != recordKeyFields) { tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS, recordKeyFields); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 9b3c7ac15..91a2e472e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -218,9 +218,10 @@ public class StreamerUtil { HoodieTableMetaClient.withPropertyBuilder() .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) + .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS)) .setArchiveLogFolder(HOODIE_ARCHIVELOG_FOLDER_PROP.defaultValue()) - .setPartitionColumns(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) + .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) .setPreCombineField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .setTimelineLayoutVersion(1) .initTable(hadoopConf, basePath); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index 2f6037c3d..7459f3a2b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -60,9 +60,9 @@ public class TestStreamerUtil { .setBasePath(tempFile.getAbsolutePath()) .setConf(new org.apache.hadoop.conf.Configuration()) .build(); - assertTrue(metaClient1.getTableConfig().getPartitionColumns().isPresent(), + assertTrue(metaClient1.getTableConfig().getPartitionFields().isPresent(), "Missing partition columns in the hoodie.properties."); - assertArrayEquals(metaClient1.getTableConfig().getPartitionColumns().get(), new String[] { "p0", "p1" }); + assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] { "p0", "p1" }); assertEquals(metaClient1.getTableConfig().getPreCombineField(), "ts"); // Test for non-partitioned table. @@ -73,7 +73,7 @@ public class TestStreamerUtil { .setBasePath(tempFile.getAbsolutePath()) .setConf(new org.apache.hadoop.conf.Configuration()) .build(); - assertFalse(metaClient2.getTableConfig().getPartitionColumns().isPresent()); + assertFalse(metaClient2.getTableConfig().getPartitionFields().isPresent()); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 3aa41eb8e..3c8bac749 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -90,7 +90,7 @@ case class HoodieFileIndex( */ private lazy val _partitionSchemaFromProperties: StructType = { val tableConfig = metaClient.getTableConfig - val partitionColumns = tableConfig.getPartitionColumns + val partitionColumns = tableConfig.getPartitionFields val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap if (partitionColumns.isPresent) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 28c0d0f02..409f5c3a6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -119,15 +119,17 @@ object HoodieSparkSqlWriter { val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP) val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) + val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY) val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setTableName(tblName) + .setRecordKeyFields(recordKeyFields) .setBaseFileFormat(baseFileFormat) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) - .setPartitionColumns(partitionColumns) + .setPartitionFields(partitionColumns) .setPopulateMetaFields(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig @@ -302,15 +304,18 @@ object HoodieSparkSqlWriter { if (!tableExists) { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP) val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) + val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY) + HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) + .setRecordKeyFields(recordKeyFields) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setBootstrapIndexClass(bootstrapIndexClass) .setBootstrapBasePath(bootstrapBasePath) - .setPartitionColumns(partitionColumns) + .setPartitionFields(partitionColumns) .initTable(sparkContext.hadoopConfiguration, path) } @@ -471,6 +476,9 @@ object HoodieSparkSqlWriter { hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD) hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE) + + hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES) + hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES) hiveSyncConfig } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 64eff9ad7..dce34b377 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.hive.util.ConfigUtils import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -27,6 +28,8 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import scala.collection.JavaConverters._ + /** * Command for create table as query statement. */ @@ -71,8 +74,14 @@ case class CreateHoodieTableAsSelectCommand( // Execute the insert query try { // Set if sync as a managed table. - sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key(), + sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key, (table.tableType == CatalogTableType.MANAGED).toString) + // Sync the options to hive serde properties + sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, + ConfigUtils.configToString(table.storage.properties.asJava)) + // Sync the table properties to hive + sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key, + ConfigUtils.configToString(table.properties.asJava)) val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty, mode == SaveMode.Overwrite, refreshTable = false) if (success) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 8aabe0008..462dfcf99 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -103,9 +103,9 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean val tableSchema = avroSchema.map(SchemaConverters.toSqlType(_).dataType .asInstanceOf[StructType]) - // Get options from the external table + // Get options from the external table and append with the options in ddl. val options = HoodieOptionConfig.mappingTableConfigToSqlOption( - metaClient.getTableConfig.getProps.asScala.toMap) + metaClient.getTableConfig.getProps.asScala.toMap) ++ table.storage.properties val userSpecifiedSchema = table.schema if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) { @@ -329,7 +329,7 @@ object CreateHoodieTableCommand extends Logging { .fromProperties(properties) .setTableName(tableName) .setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString()) - .setPartitionColumns(table.partitionColumnNames.mkString(",")) + .setPartitionFields(table.partitionColumnNames.mkString(",")) .initTable(conf, location) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 24723849a..478e0eec8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.common.table.HoodieTableMetaClient + class TestMergeIntoTable2 extends TestHoodieSqlBase { test("Test MergeInto for MOR table 2") { @@ -135,4 +137,39 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { )("assertion failed: Target table's field(price) cannot be the right-value of the update clause for MOR table.") } } + + test("Test Merge Into CTAS Table") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName using hudi + |options(primaryKey = 'id') + |location '${tmp.getCanonicalPath}' + |as + |select 1 as id, 'a1' as name + |""".stripMargin + ) + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tmp.getCanonicalPath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + // check record key in hoodie.properties + assertResult("id")(metaClient.getTableConfig.getRecordKeyFields.get().mkString(",")) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as s_id, 'a1_1' as name + |) s0 + |on h0.id = s0.s_id + |when matched then update set * + |""".stripMargin + ) + checkAnswer(s"select id, name from $tableName")( + Seq(1, "a1_1") + ) + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 7d2feb005..a08293d0c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -255,7 +255,7 @@ public class DeltaSync implements Serializable { .setArchiveLogFolder(HOODIE_ARCHIVELOG_FOLDER_PROP.defaultValue()) .setPayloadClassName(cfg.payloadClassName) .setBaseFileFormat(cfg.baseFileFormat) - .setPartitionColumns(partitionColumns) + .setPartitionFields(partitionColumns) .setPreCombineField(cfg.sourceOrderingField) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); @@ -354,7 +354,7 @@ public class DeltaSync implements Serializable { .setArchiveLogFolder(HOODIE_ARCHIVELOG_FOLDER_PROP.defaultValue()) .setPayloadClassName(cfg.payloadClassName) .setBaseFileFormat(cfg.baseFileFormat) - .setPartitionColumns(partitionColumns) + .setPartitionFields(partitionColumns) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); }