1
0

[HUDI-3748] write and select hudi table when enable hoodie.datasource.write.drop.partition.columns (#5201)

This commit is contained in:
Yann Byron
2022-04-05 16:31:41 +08:00
committed by GitHub
parent 325b3d610a
commit 3195f51562
15 changed files with 335 additions and 67 deletions

View File

@@ -164,6 +164,18 @@ public class HoodieAvroUtils {
return reader.read(null, jsonDecoder);
}
/**
* True if the schema contains this name of field
*/
public static boolean containsFieldInSchema(Schema schema, String fieldName) {
try {
Field field = schema.getField(fieldName);
return field != null;
} catch (Exception e) {
return false;
}
}
public static boolean isMetadataField(String fieldName) {
return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
@@ -324,13 +336,19 @@ public class HoodieAvroUtils {
* @param newFieldNames Null Field names to be added
*/
public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
List<Field> newFields = schema.getFields().stream()
.map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList());
List<Field> newFields = new ArrayList<>();
for (String newField : newFieldNames) {
newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE));
}
return createNewSchemaWithExtraFields(schema, newFields);
}
public static Schema createNewSchemaWithExtraFields(Schema schema, List<Field> newFields) {
List<Field> fields = schema.getFields().stream()
.map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList());
fields.addAll(newFields);
Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
newSchema.setFields(newFields);
newSchema.setFields(fields);
return newSchema;
}

View File

@@ -196,6 +196,11 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). "
+ "If false (default) partition metafiles are saved as properties files.");
public static final ConfigProperty<Boolean> DROP_PARTITION_COLUMNS = ConfigProperty
.key("hoodie.datasource.write.drop.partition.columns")
.defaultValue(false)
.withDocumentation("When set to true, will not write the partition columns into hudi. By default, false.");
public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
@@ -426,6 +431,9 @@ public class HoodieTableConfig extends HoodieConfig {
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
}
hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS);
storeProperties(hoodieConfig.getProps(), outputStream);
}
}
@@ -599,6 +607,10 @@ public class HoodieTableConfig extends HoodieConfig {
return getString(URL_ENCODE_PARTITIONING);
}
public Boolean isDropPartitionColumns() {
return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
}
/**
* Read the table checksum.
*/

View File

@@ -700,6 +700,7 @@ public class HoodieTableMetaClient implements Serializable {
private Boolean urlEncodePartitioning;
private HoodieTimelineTimeZone commitTimeZone;
private Boolean partitionMetafileUseBaseFormat;
private Boolean dropPartitionColumnsWhenWrite;
/**
* Persist the configs that is written at the first time, and should not be changed.
@@ -819,6 +820,11 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
public PropertyBuilder setDropPartitionColumnsWhenWrite(Boolean dropPartitionColumnsWhenWrite) {
this.dropPartitionColumnsWhenWrite = dropPartitionColumnsWhenWrite;
return this;
}
public PropertyBuilder set(String key, Object value) {
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
this.others.put(key, value);
@@ -917,6 +923,10 @@ public class HoodieTableMetaClient implements Serializable {
if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) {
setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT));
}
if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) {
setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
}
return this;
}
@@ -998,6 +1008,10 @@ public class HoodieTableMetaClient implements Serializable {
if (null != partitionMetafileUseBaseFormat) {
tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString());
}
if (null != dropPartitionColumnsWhenWrite) {
tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite));
}
return tableConfig.getProps();
}

View File

@@ -18,6 +18,16 @@
package org.apache.hudi.common.table;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -35,6 +45,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.io.storage.HoodieOrcReader;
@@ -42,15 +53,9 @@ import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -58,6 +63,9 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Helper class to read schema from data files and log files and to convert it between different formats.
@@ -159,23 +167,67 @@ public class TableSchemaResolver {
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception {
Schema schema;
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(includeMetadataFields);
if (schemaFromCommitMetadata.isPresent()) {
return schemaFromCommitMetadata.get();
}
Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
if (schemaFromTableConfig.isPresent()) {
if (includeMetadataFields) {
return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField);
schema = schemaFromCommitMetadata.get();
} else {
Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
if (schemaFromTableConfig.isPresent()) {
if (includeMetadataFields) {
schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField);
} else {
schema = schemaFromTableConfig.get();
}
} else {
return schemaFromTableConfig.get();
if (includeMetadataFields) {
schema = getTableAvroSchemaFromDataFile();
} else {
schema = HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
}
}
}
if (includeMetadataFields) {
return getTableAvroSchemaFromDataFile();
} else {
return HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
Option<String[]> partitionFieldsOpt = metaClient.getTableConfig().getPartitionFields();
if (metaClient.getTableConfig().isDropPartitionColumns()) {
schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, schema);
}
return schema;
}
public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> partitionFieldsOpt, Schema originSchema) {
// when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in data files.
// And there are no partition schema if the schema is parsed from data files.
// Here we create partition Fields for this case, and use StringType as the data type.
Schema schema = originSchema;
if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 0) {
List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
final Schema schema0 = originSchema;
boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch(
pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt)
);
boolean hasPartitionColInSchema = partitionFields.stream().anyMatch(
pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt)
);
if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
throw new HoodieIncompatibleSchemaException(
"Not support: Partial partition fields are still in the schema "
+ "when enable hoodie.datasource.write.drop.partition.columns");
}
if (hasPartitionColNotInSchema) {
// when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns
// are not in originSchema. So we create and add them.
List<Field> newFields = new ArrayList<>();
for (String partitionField: partitionFields) {
newFields.add(new Schema.Field(
partitionField, Schema.create(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
}
schema = HoodieAvroUtils.createNewSchemaWithExtraFields(schema, newFields);
}
}
return schema;
}
/**

View File

@@ -21,7 +21,7 @@ package org.apache.hudi.exception;
/**
* Exception for incompatible schema.
*/
public class HoodieIncompatibleSchemaException extends Exception {
public class HoodieIncompatibleSchemaException extends RuntimeException {
public HoodieIncompatibleSchemaException(String msg, Throwable e) {
super(msg, e);