1
0

[HUDI-4474] Infer metasync configs (#6217)

- infer repeated sync configs from original configs
  - `META_SYNC_BASE_FILE_FORMAT`
    - infer from `org.apache.hudi.common.table.HoodieTableConfig.BASE_FILE_FORMAT`
  - `META_SYNC_ASSUME_DATE_PARTITION`
    - infer from `org.apache.hudi.common.config.HoodieMetadataConfig.ASSUME_DATE_PARTITIONING`
  - `META_SYNC_DECODE_PARTITION`
    - infer from `org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING`
  - `META_SYNC_USE_FILE_LISTING_FROM_METADATA`
    - infer from `org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE`

As proposed in https://github.com/apache/hudi/blob/master/rfc/rfc-55/rfc-55.md#compatible-changes
This commit is contained in:
Shiyan Xu
2022-07-26 04:58:31 -05:00
committed by GitHub
parent 74d7b4d751
commit 1ea1e659c2
4 changed files with 107 additions and 35 deletions

View File

@@ -172,6 +172,11 @@ public class HoodieConfig implements Serializable {
.orElseGet(() -> Boolean.parseBoolean(configProperty.defaultValue().toString()));
}
public <T> boolean getBooleanOrDefault(ConfigProperty<T> configProperty, boolean defaultVal) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(defaultVal);
}
public <T> Long getLong(ConfigProperty<T> configProperty) {
Option<Object> rawValue = getRawValue(configProperty);
return rawValue.map(v -> Long.parseLong(v.toString())).orElse(null);

View File

@@ -426,12 +426,8 @@ object DataSourceWriteOptions {
@Deprecated
val METASTORE_URIS: ConfigProperty[String] = HiveSyncConfigHolder.METASTORE_URIS
@Deprecated
val hivePartitionFieldsInferFunc: JavaFunction[HoodieConfig, Option[String]] = HoodieSyncConfig.PARTITION_FIELDS_INFERENCE_FUNCTION
@Deprecated
val HIVE_PARTITION_FIELDS: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_PARTITION_FIELDS
@Deprecated
val hivePartitionExtractorInferFunc: JavaFunction[HoodieConfig, Option[String]] = HoodieSyncConfig.PARTITION_EXTRACTOR_CLASS_FUNCTION
@Deprecated
val HIVE_PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS
@Deprecated
val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -34,8 +35,8 @@ import org.apache.hadoop.fs.FileSystem;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_NAME_KEY;
import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY;
@@ -72,57 +73,51 @@ public class HoodieSyncConfig extends HoodieConfig {
public static final ConfigProperty<String> META_SYNC_BASE_FILE_FORMAT = ConfigProperty
.key("hoodie.datasource.hive_sync.base_file_format")
.defaultValue("PARQUET")
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(HoodieTableConfig.BASE_FILE_FORMAT)))
.withDocumentation("Base file format for the sync.");
// If partition fields are not explicitly provided, obtain from the KeyGeneration Configs
public static final Function<HoodieConfig, Option<String>> PARTITION_FIELDS_INFERENCE_FUNCTION = cfg -> {
if (cfg.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)) {
return Option.of(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME));
} else {
return Option.empty();
}
};
public static final ConfigProperty<String> META_SYNC_PARTITION_FIELDS = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_fields")
.defaultValue("")
.withInferFunction(PARTITION_FIELDS_INFERENCE_FUNCTION)
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)))
.withDocumentation("Field in the table to use for determining hive partition columns.");
// If partition value extraction class is not explicitly provided, configure based on the partition fields.
public static final Function<HoodieConfig, Option<String>> PARTITION_EXTRACTOR_CLASS_FUNCTION = cfg -> {
if (!cfg.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)) {
return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
} else {
int numOfPartFields = cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME).split(",").length;
if (numOfPartFields == 1
&& cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
&& cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) {
return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
} else {
return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
}
}
};
public static final ConfigProperty<String> META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_extractor_class")
.defaultValue("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor")
.withInferFunction(PARTITION_EXTRACTOR_CLASS_FUNCTION)
.withInferFunction(cfg -> {
if (cfg.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME)) {
int numOfPartFields = cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME).split(",").length;
if (numOfPartFields == 1
&& cfg.contains(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
&& cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE).equals("true")) {
return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor");
} else {
return Option.of("org.apache.hudi.hive.MultiPartKeysValueExtractor");
}
} else {
return Option.of("org.apache.hudi.hive.NonPartitionedExtractor");
}
})
.withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, "
+ "default 'SlashEncodedDayPartitionValueExtractor'.");
public static final ConfigProperty<String> META_SYNC_ASSUME_DATE_PARTITION = ConfigProperty
.key("hoodie.datasource.hive_sync.assume_date_partitioning")
.defaultValue("false")
.withDocumentation("Assume partitioning is yyyy/mm/dd");
.defaultValue(HoodieMetadataConfig.ASSUME_DATE_PARTITIONING.defaultValue())
.withInferFunction(cfg -> Option.ofNullable(cfg.getString(HoodieMetadataConfig.ASSUME_DATE_PARTITIONING)))
.withDocumentation("Assume partitioning is yyyy/MM/dd");
public static final ConfigProperty<Boolean> META_SYNC_DECODE_PARTITION = ConfigProperty
.key("hoodie.meta.sync.decode_partition")
.defaultValue(false) // TODO infer from url encode option
.withDocumentation("");
.defaultValue(false)
.withInferFunction(cfg -> Option.ofNullable(cfg.getBoolean(HoodieTableConfig.URL_ENCODE_PARTITIONING)))
.withDocumentation("If true, meta sync will url-decode the partition path, as it is deemed as url-encoded. Default to false.");
public static final ConfigProperty<Boolean> META_SYNC_USE_FILE_LISTING_FROM_METADATA = ConfigProperty
.key("hoodie.meta.sync.metadata_file_listing")
.defaultValue(HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
.defaultValue(DEFAULT_METADATA_ENABLE_FOR_READERS)
.withInferFunction(cfg -> Option.of(cfg.getBooleanOrDefault(HoodieMetadataConfig.ENABLE, DEFAULT_METADATA_ENABLE_FOR_READERS)))
.withDocumentation("Enable the internal metadata table for file listing for syncing with metastores");
public static final ConfigProperty<String> META_SYNC_CONDITIONAL_SYNC = ConfigProperty

View File

@@ -19,16 +19,26 @@
package org.apache.hudi.sync.common;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DECODE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class TestHoodieSyncConfig {
@@ -52,4 +62,70 @@ class TestHoodieSyncConfig {
assertEquals("default", config3.getString(META_SYNC_DATABASE_NAME));
assertEquals("unknown", config3.getString(META_SYNC_TABLE_NAME));
}
@Test
void testInferBaseFileFormat() {
Properties props1 = new Properties();
props1.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), "ORC");
HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration());
assertEquals("ORC", config1.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT));
HoodieSyncConfig config2 = new HoodieSyncConfig(new Properties(), new Configuration());
assertEquals("PARQUET", config2.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT));
}
@Test
void testInferPartitionFields() {
Properties props1 = new Properties();
props1.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration());
assertEquals("foo,bar", config1.getStringOrDefault(META_SYNC_PARTITION_FIELDS));
}
@Test
void testInferPartitonExtractorClass() {
Properties props1 = new Properties();
props1.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo,bar");
HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration());
assertEquals("org.apache.hudi.hive.MultiPartKeysValueExtractor",
config1.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
Properties props2 = new Properties();
props2.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "foo");
props2.setProperty(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), "true");
HoodieSyncConfig config2 = new HoodieSyncConfig(props2, new Configuration());
assertEquals("org.apache.hudi.hive.HiveStylePartitionValueExtractor",
config2.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
HoodieSyncConfig config3 = new HoodieSyncConfig(new Properties(), new Configuration());
assertEquals("org.apache.hudi.hive.NonPartitionedExtractor",
config3.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
}
@Test
void testInferAssumeDatePartition() {
Properties props1 = new Properties();
props1.setProperty(HoodieMetadataConfig.ASSUME_DATE_PARTITIONING.key(), "true");
HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration());
assertEquals("true", config1.getString(META_SYNC_ASSUME_DATE_PARTITION));
}
@Test
void testInferDecodePartition() {
Properties props1 = new Properties();
props1.setProperty(HoodieTableConfig.URL_ENCODE_PARTITIONING.key(), "true");
HoodieSyncConfig config1 = new HoodieSyncConfig(props1, new Configuration());
assertTrue(config1.getBoolean(META_SYNC_DECODE_PARTITION));
}
@Test
void testInferUseFileListingFromMetadata() {
HoodieSyncConfig config1 = new HoodieSyncConfig(new Properties(), new Configuration());
assertEquals(DEFAULT_METADATA_ENABLE_FOR_READERS, config1.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA));
Properties props2 = new Properties();
props2.setProperty(HoodieMetadataConfig.ENABLE.key(), "true");
HoodieSyncConfig config2 = new HoodieSyncConfig(props2, new Configuration());
assertTrue(config2.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA));
}
}