From 3195f51562b8cd7e62a46d41ae91e6d25be77988 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 5 Apr 2022 16:31:41 +0800 Subject: [PATCH] [HUDI-3748] write and select hudi table when enable hoodie.datasource.write.drop.partition.columns (#5201) --- .../hudi/cli/commands/TestRepairsCommand.java | 4 +- .../org/apache/hudi/avro/HoodieAvroUtils.java | 24 ++++- .../hudi/common/table/HoodieTableConfig.java | 12 +++ .../common/table/HoodieTableMetaClient.java | 14 +++ .../common/table/TableSchemaResolver.java | 88 +++++++++++++---- .../HoodieIncompatibleSchemaException.java | 2 +- .../common/table/TestHoodieTableConfig.java | 10 +- .../common/table/TestTableSchemaResolver.java | 70 ++++++++++++++ .../apache/hudi/BaseFileOnlyRelation.scala | 5 +- .../org/apache/hudi/DataSourceOptions.scala | 9 +- .../org/apache/hudi/HoodieBaseRelation.scala | 95 ++++++++++++++++--- .../apache/hudi/HoodieSparkSqlWriter.scala | 5 +- .../hudi/MergeOnReadSnapshotRelation.scala | 2 +- .../hudi/functional/TestCOWDataSource.scala | 29 +++--- .../spark/sql/hudi/TestInsertTable.scala | 33 +++++++ 15 files changed, 335 insertions(+), 67 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index 27cc31cce..96e0873da 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -51,6 +51,7 @@ import java.util.Properties; import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS; import static org.apache.hudi.common.table.HoodieTableConfig.NAME; import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM; import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_LAYOUT_VERSION; @@ -188,11 +189,12 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { Map expected = expectProps.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); expected.putIfAbsent(TABLE_CHECKSUM.key(), String.valueOf(generateChecksum(tableConfig.getProps()))); + expected.putIfAbsent(DROP_PARTITION_COLUMNS.key(), String.valueOf(DROP_PARTITION_COLUMNS.defaultValue())); assertEquals(expected, result); // check result List allPropsStr = Arrays.asList(NAME.key(), TYPE.key(), VERSION.key(), - ARCHIVELOG_FOLDER.key(), TIMELINE_LAYOUT_VERSION.key(), TABLE_CHECKSUM.key()); + ARCHIVELOG_FOLDER.key(), TIMELINE_LAYOUT_VERSION.key(), TABLE_CHECKSUM.key(), DROP_PARTITION_COLUMNS.key()); String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key, oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")}) .toArray(String[][]::new); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 237851c33..1055bd522 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -164,6 +164,18 @@ public class HoodieAvroUtils { return reader.read(null, jsonDecoder); } + /** + * True if the schema contains this name of field + */ + public static boolean containsFieldInSchema(Schema schema, String fieldName) { + try { + Field field = schema.getField(fieldName); + return field != null; + } catch (Exception e) { + return false; + } + } + public static boolean isMetadataField(String fieldName) { return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName) || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName) @@ -324,13 +336,19 @@ public class HoodieAvroUtils { * @param newFieldNames Null Field names to be added */ public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { - List newFields = schema.getFields().stream() - .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList()); + List newFields = new ArrayList<>(); for (String newField : newFieldNames) { newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE)); } + return createNewSchemaWithExtraFields(schema, newFields); + } + + public static Schema createNewSchemaWithExtraFields(Schema schema, List newFields) { + List fields = schema.getFields().stream() + .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList()); + fields.addAll(newFields); Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); - newSchema.setFields(newFields); + newSchema.setFields(fields); return newSchema; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index bfcec84ce..c158372f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -196,6 +196,11 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). " + "If false (default) partition metafiles are saved as properties files."); + public static final ConfigProperty DROP_PARTITION_COLUMNS = ConfigProperty + .key("hoodie.datasource.write.drop.partition.columns") + .defaultValue(false) + .withDocumentation("When set to true, will not write the partition columns into hudi. By default, false."); + public static final ConfigProperty URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; public static final ConfigProperty HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; @@ -426,6 +431,9 @@ public class HoodieTableConfig extends HoodieConfig { if (hoodieConfig.contains(TIMELINE_TIMEZONE)) { HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE))); } + + hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS); + storeProperties(hoodieConfig.getProps(), outputStream); } } @@ -599,6 +607,10 @@ public class HoodieTableConfig extends HoodieConfig { return getString(URL_ENCODE_PARTITIONING); } + public Boolean isDropPartitionColumns() { + return getBooleanOrDefault(DROP_PARTITION_COLUMNS); + } + /** * Read the table checksum. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 4e04ad9db..38b5509cd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -700,6 +700,7 @@ public class HoodieTableMetaClient implements Serializable { private Boolean urlEncodePartitioning; private HoodieTimelineTimeZone commitTimeZone; private Boolean partitionMetafileUseBaseFormat; + private Boolean dropPartitionColumnsWhenWrite; /** * Persist the configs that is written at the first time, and should not be changed. @@ -819,6 +820,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setDropPartitionColumnsWhenWrite(Boolean dropPartitionColumnsWhenWrite) { + this.dropPartitionColumnsWhenWrite = dropPartitionColumnsWhenWrite; + return this; + } + public PropertyBuilder set(String key, Object value) { if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) { this.others.put(key, value); @@ -917,6 +923,10 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) { setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)); } + + if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) { + setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS)); + } return this; } @@ -998,6 +1008,10 @@ public class HoodieTableMetaClient implements Serializable { if (null != partitionMetafileUseBaseFormat) { tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString()); } + + if (null != dropPartitionColumnsWhenWrite) { + tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite)); + } return tableConfig.getProps(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 854eef175..262157a8a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -18,6 +18,16 @@ package org.apache.hudi.common.table; +import org.apache.avro.JsonProperties; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -35,6 +45,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIncompatibleSchemaException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.io.storage.HoodieOrcReader; @@ -42,15 +53,9 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.SerDeHelper; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.SchemaCompatibility; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; + import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; @@ -58,6 +63,9 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; /** * Helper class to read schema from data files and log files and to convert it between different formats. @@ -159,23 +167,67 @@ public class TableSchemaResolver { * @throws Exception */ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { + Schema schema; Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(includeMetadataFields); if (schemaFromCommitMetadata.isPresent()) { - return schemaFromCommitMetadata.get(); - } - Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); - if (schemaFromTableConfig.isPresent()) { - if (includeMetadataFields) { - return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); + schema = schemaFromCommitMetadata.get(); + } else { + Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); + if (schemaFromTableConfig.isPresent()) { + if (includeMetadataFields) { + schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); + } else { + schema = schemaFromTableConfig.get(); + } } else { - return schemaFromTableConfig.get(); + if (includeMetadataFields) { + schema = getTableAvroSchemaFromDataFile(); + } else { + schema = HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); + } } } - if (includeMetadataFields) { - return getTableAvroSchemaFromDataFile(); - } else { - return HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); + + Option partitionFieldsOpt = metaClient.getTableConfig().getPartitionFields(); + if (metaClient.getTableConfig().isDropPartitionColumns()) { + schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, schema); } + return schema; + } + + public static Schema recreateSchemaWhenDropPartitionColumns(Option partitionFieldsOpt, Schema originSchema) { + // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in data files. + // And there are no partition schema if the schema is parsed from data files. + // Here we create partition Fields for this case, and use StringType as the data type. + Schema schema = originSchema; + if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 0) { + List partitionFields = Arrays.asList(partitionFieldsOpt.get()); + + final Schema schema0 = originSchema; + boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch( + pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt) + ); + boolean hasPartitionColInSchema = partitionFields.stream().anyMatch( + pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt) + ); + if (hasPartitionColNotInSchema && hasPartitionColInSchema) { + throw new HoodieIncompatibleSchemaException( + "Not support: Partial partition fields are still in the schema " + + "when enable hoodie.datasource.write.drop.partition.columns"); + } + + if (hasPartitionColNotInSchema) { + // when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns + // are not in originSchema. So we create and add them. + List newFields = new ArrayList<>(); + for (String partitionField: partitionFields) { + newFields.add(new Schema.Field( + partitionField, Schema.create(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); + } + schema = HoodieAvroUtils.createNewSchemaWithExtraFields(schema, newFields); + } + } + return schema; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java index 579ae21d3..a739af679 100644 --- a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java @@ -21,7 +21,7 @@ package org.apache.hudi.exception; /** * Exception for incompatible schema. */ -public class HoodieIncompatibleSchemaException extends Exception { +public class HoodieIncompatibleSchemaException extends RuntimeException { public HoodieIncompatibleSchemaException(String msg, Throwable e) { super(msg, e); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java index f21d8e6dc..0defefe2e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java @@ -64,7 +64,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { public void testCreate() throws IOException { assertTrue(fs.exists(new Path(metaPath, HoodieTableConfig.HOODIE_PROPERTIES_FILE))); HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null); - assertEquals(5, config.getProps().size()); + assertEquals(6, config.getProps().size()); } @Test @@ -77,7 +77,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertTrue(fs.exists(cfgPath)); assertFalse(fs.exists(backupCfgPath)); HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null); - assertEquals(6, config.getProps().size()); + assertEquals(7, config.getProps().size()); assertEquals("test-table2", config.getTableName()); assertEquals("new_field", config.getPreCombineField()); } @@ -90,7 +90,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertTrue(fs.exists(cfgPath)); assertFalse(fs.exists(backupCfgPath)); HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null); - assertEquals(4, config.getProps().size()); + assertEquals(5, config.getProps().size()); assertNull(config.getProps().getProperty("hoodie.invalid.config")); assertFalse(config.getProps().contains(HoodieTableConfig.ARCHIVELOG_FOLDER.key())); } @@ -114,7 +114,7 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertFalse(fs.exists(cfgPath)); assertTrue(fs.exists(backupCfgPath)); config = new HoodieTableConfig(fs, metaPath.toString(), null); - assertEquals(5, config.getProps().size()); + assertEquals(6, config.getProps().size()); } @ParameterizedTest @@ -132,6 +132,6 @@ public class TestHoodieTableConfig extends HoodieCommonTestHarness { assertTrue(fs.exists(cfgPath)); assertFalse(fs.exists(backupCfgPath)); config = new HoodieTableConfig(fs, metaPath.toString(), null); - assertEquals(5, config.getProps().size()); + assertEquals(6, config.getProps().size()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java new file mode 100644 index 000000000..59a24a79f --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.avro.Schema; + +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; + +import org.apache.hudi.exception.HoodieIncompatibleSchemaException; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestTableSchemaResolver { + + @Test + public void testRecreateSchemaWhenDropPartitionColumns() { + Schema originSchema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + + // case1 + Option emptyPartitionFieldsOpt = Option.empty(); + Schema s1 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(emptyPartitionFieldsOpt, originSchema); + assertEquals(originSchema, s1); + + // case2 + String[] pts1 = new String[0]; + Schema s2 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts1), originSchema); + assertEquals(originSchema, s2); + + // case3: partition_path is in originSchema + String[] pts2 = {"partition_path"}; + Schema s3 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts2), originSchema); + assertEquals(originSchema, s3); + + // case4: user_partition is not in originSchema + String[] pts3 = {"user_partition"}; + Schema s4 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts3), originSchema); + assertNotEquals(originSchema, s4); + assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition"))); + Schema.Field f = s4.getField("user_partition"); + assertEquals(f.schema().getType().getName(), "string"); + + // case5: user_partition is in originSchema, but partition_path is in originSchema + String[] pts4 = {"user_partition", "partition_path"}; + try { + TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts3), originSchema); + } catch (HoodieIncompatibleSchemaException e) { + assertTrue(e.getMessage().contains("Partial partition fields are still in the schema")); + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 34c4e5c5a..525292da6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,10 +20,11 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.table.HoodieTableMetaClient + import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.{BaseRelation, Filter} @@ -91,7 +92,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, sparkSession = sparkSession, file = file, // TODO clarify why this is required - partitionValues = InternalRow.empty + partitionValues = getPartitionColumnsAsInternalRow(file) ) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index f86e55b43..432988962 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -471,11 +471,10 @@ object DataSourceWriteOptions { .sinceVersion("0.9.0") .withDocumentation("This class is used by kafka client to deserialize the records") - val DROP_PARTITION_COLUMNS: ConfigProperty[String] = ConfigProperty - .key("hoodie.datasource.write.drop.partition.columns") - .defaultValue("false") - .withDocumentation("When set to true, will not write the partition columns into hudi. " + - "By default, false.") + val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty + .key(HoodieTableConfig.DROP_PARTITION_COLUMNS.key()) + .defaultValue(HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue().booleanValue()) + .withDocumentation(HoodieTableConfig.DROP_PARTITION_COLUMNS.doc()) /** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */ @Deprecated diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 072beefcf..f79ba96d8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -19,10 +19,12 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf + import org.apache.hudi.HoodieBaseRelation.getPartitionPath import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.SerializableConfiguration @@ -33,27 +35,29 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkState -import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.io.storage.HoodieHFileReader -import org.apache.hudi.metadata.HoodieTableMetadata + import org.apache.spark.TaskContext import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} -import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext, SparkSession} +import org.apache.spark.unsafe.types.UTF8String import java.io.Closeable +import java.net.URI + import scala.collection.JavaConverters._ import scala.util.Try +import scala.util.control.NonFatal trait HoodieFileSplit {} @@ -140,6 +144,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) + /** + * if true, need to deal with schema for creating file reader. + */ + protected val dropPartitionColumnsWhenWrite: Boolean = + metaClient.getTableConfig.isDropPartitionColumns && partitionColumns.nonEmpty + /** * NOTE: PLEASE READ THIS CAREFULLY * @@ -209,14 +219,37 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val fileSplits = collectFileSplits(partitionFilters, dataFilters) - val partitionSchema = StructType(Nil) - val tableSchema = HoodieTableSchema(tableStructSchema, if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString, internalSchema) - val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema) + val partitionSchema = if (dropPartitionColumnsWhenWrite) { + // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in + // data files. + StructType(partitionColumns.map(StructField(_, StringType))) + } else { + StructType(Nil) + } + val tableSchema = HoodieTableSchema(tableStructSchema, if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString, internalSchema) + val dataSchema = if (dropPartitionColumnsWhenWrite) { + val dataStructType = StructType(tableStructSchema.filterNot(f => partitionColumns.contains(f.name))) + HoodieTableSchema( + dataStructType, + sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, nullable = false, "record").toString() + ) + } else { + tableSchema + } + val requiredSchema = if (dropPartitionColumnsWhenWrite) { + val requiredStructType = StructType(requiredStructSchema.filterNot(f => partitionColumns.contains(f.name))) + HoodieTableSchema( + requiredStructType, + sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, nullable = false, "record").toString() + ) + } else { + HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema) + } // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details if (fileSplits.nonEmpty) - composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] + composeRDD(fileSplits, partitionSchema, dataSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] else sparkSession.sparkContext.emptyRDD } @@ -286,8 +319,16 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) - requestedColumns ++ missing + if (dropPartitionColumnsWhenWrite) { + if (requestedColumns.isEmpty) { + mandatoryColumns.toArray + } else { + requestedColumns + } + } else { + val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) + requestedColumns ++ missing + } } protected def getTableState: HoodieTableState = { @@ -308,6 +349,38 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // TODO(HUDI-3639) vectorized reader has to be disabled to make sure MORIncrementalRelation is working properly sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") } + + /** + * For enable hoodie.datasource.write.drop.partition.columns, need to create an InternalRow on partition values + * and pass this reader on parquet file. So that, we can query the partition columns. + */ + protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = { + try { + val tableConfig = metaClient.getTableConfig + if (dropPartitionColumnsWhenWrite) { + val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString + val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean + if (hiveStylePartitioningEnabled) { + val partitionSpec = PartitioningUtils.parsePathFragment(relativePath) + InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString)) + } else { + if (partitionColumns.length == 1) { + InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath))) + } else { + val parts = relativePath.split("/") + assert(parts.size == partitionColumns.length) + InternalRow.fromSeq(parts.map(UTF8String.fromString)) + } + } + } else { + InternalRow.empty + } + } catch { + case NonFatal(e) => + logWarning(s"Failed to get the right partition InternalRow for file : ${file.toString}") + InternalRow.empty + } + } } object HoodieBaseRelation { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7dbc9997c..f1948d80c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -160,6 +160,7 @@ object HoodieSparkSqlWriter { .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) + .setDropPartitionColumnsWhenWrite(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) tableConfig = tableMetaClient.getTableConfig @@ -501,8 +502,8 @@ object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))) - val dropPartitionColumns = - parameters.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()).toBoolean + val dropPartitionColumns = parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean) + .getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()) // register classes & schemas val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) sparkContext.getConf.registerKryoClasses( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 767a96994..d85788e25 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -127,7 +127,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val partitionedBaseFile = baseFile.map { file => val filePath = getFilePath(file.getFileStatus.getPath) - PartitionedFile(InternalRow.empty, filePath, 0, file.getFileLen) + PartitionedFile(getPartitionColumnsAsInternalRow(file.getFileStatus), filePath, 0, file.getFileLen) } HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 8c9e9daf8..000004ace 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -749,8 +749,17 @@ class TestCOWDataSource extends HoodieClientTestBase { @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) { - val resultContainPartitionColumn = copyOnWriteTableSelect(enableDropPartitionColumns) - assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn) + val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key, enableDropPartitionColumns) + .mode(SaveMode.Overwrite) + .save(basePath) + val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath) + assertEquals(snapshotDF1.count(), 100) + assertEquals(3, snapshotDF1.select("partition").distinct().count()) } @Test @@ -863,22 +872,6 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(500, hoodieIncViewDF.count()) } - def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = { - val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDF1.write.format("org.apache.hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key, enableDropPartitionColumns) - .mode(SaveMode.Overwrite) - .save(basePath) - val snapshotDF1 = spark.read.format("org.apache.hudi") - .load(basePath + "/*/*/*/*") - snapshotDF1.registerTempTable("tmptable") - val result = spark.sql("select * from tmptable limit 1").collect()(0) - result.schema.contains(new StructField("partition", StringType, true)) - } - @Test def testWriteSmallPrecisionDecimalTable(): Unit = { val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index b186381c2..3141208db 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -630,4 +630,37 @@ class TestInsertTable extends TestHoodieSqlBase { } } } + + test("Test enable hoodie.datasource.write.drop.partition.columns when write") { + spark.sql("set hoodie.sql.bulk.insert.enable = false") + Seq("mor", "cow").foreach { tableType => + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = '$tableType', + | hoodie.datasource.write.drop.partition.columns = 'true' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName partition(dt='2021-12-25') values (1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName partition(dt='2021-12-25') values (2, 'a2', 20, 1000)") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10, 1000, "2021-12-25"), + Seq(2, "a2", 20, 1000, "2021-12-25") + ) + } + } + } }