[HUDI-4124] Add valid check in Spark Datasource configs (#5637)
Co-authored-by: wangzixuan.wzxuan <wangzixuan.wzxuan@bytedance.com>
This commit is contained in:
@@ -44,6 +44,13 @@ import static org.apache.hudi.config.HoodieHBaseIndexConfig.PUT_BATCH_SIZE;
|
|||||||
import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME;
|
import static org.apache.hudi.config.HoodieHBaseIndexConfig.TABLENAME;
|
||||||
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKPORT;
|
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKPORT;
|
||||||
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM;
|
import static org.apache.hudi.config.HoodieHBaseIndexConfig.ZKQUORUM;
|
||||||
|
import static org.apache.hudi.index.HoodieIndex.IndexType.BLOOM;
|
||||||
|
import static org.apache.hudi.index.HoodieIndex.IndexType.BUCKET;
|
||||||
|
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
|
||||||
|
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
|
||||||
|
import static org.apache.hudi.index.HoodieIndex.IndexType.HBASE;
|
||||||
|
import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
|
||||||
|
import static org.apache.hudi.index.HoodieIndex.IndexType.SIMPLE;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indexing related config.
|
* Indexing related config.
|
||||||
@@ -57,7 +64,10 @@ public class HoodieIndexConfig extends HoodieConfig {
|
|||||||
|
|
||||||
public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
|
public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
|
||||||
.key("hoodie.index.type")
|
.key("hoodie.index.type")
|
||||||
|
// Builder#getDefaultIndexType has already set it according to engine type
|
||||||
.noDefaultValue()
|
.noDefaultValue()
|
||||||
|
.withValidValues(HBASE.name(), INMEMORY.name(), BLOOM.name(), GLOBAL_BLOOM.name(),
|
||||||
|
SIMPLE.name(), GLOBAL_SIMPLE.name(), BUCKET.name())
|
||||||
.withDocumentation("Type of index to use. Default is Bloom filter. "
|
.withDocumentation("Type of index to use. Default is Bloom filter. "
|
||||||
+ "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
|
+ "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. "
|
||||||
+ "Bloom filters removes the dependency on a external system "
|
+ "Bloom filters removes the dependency on a external system "
|
||||||
@@ -141,6 +151,7 @@ public class HoodieIndexConfig extends HoodieConfig {
|
|||||||
public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
|
public static final ConfigProperty<String> BLOOM_FILTER_TYPE = ConfigProperty
|
||||||
.key("hoodie.bloom.index.filter.type")
|
.key("hoodie.bloom.index.filter.type")
|
||||||
.defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
|
.defaultValue(BloomFilterTypeCode.DYNAMIC_V0.name())
|
||||||
|
.withValidValues(BloomFilterTypeCode.SIMPLE.name(), BloomFilterTypeCode.DYNAMIC_V0.name())
|
||||||
.withDocumentation("Filter type used. Default is BloomFilterTypeCode.DYNAMIC_V0. "
|
.withDocumentation("Filter type used. Default is BloomFilterTypeCode.DYNAMIC_V0. "
|
||||||
+ "Available values are [BloomFilterTypeCode.SIMPLE , BloomFilterTypeCode.DYNAMIC_V0]. "
|
+ "Available values are [BloomFilterTypeCode.SIMPLE , BloomFilterTypeCode.DYNAMIC_V0]. "
|
||||||
+ "Dynamic bloom filters auto size themselves based on number of keys.");
|
+ "Dynamic bloom filters auto size themselves based on number of keys.");
|
||||||
|
|||||||
@@ -24,7 +24,10 @@ import org.apache.hudi.exception.HoodieException;
|
|||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@@ -48,19 +51,22 @@ public class ConfigProperty<T> implements Serializable {
|
|||||||
|
|
||||||
private final Option<String> deprecatedVersion;
|
private final Option<String> deprecatedVersion;
|
||||||
|
|
||||||
|
private final Set<String> validValues;
|
||||||
|
|
||||||
private final String[] alternatives;
|
private final String[] alternatives;
|
||||||
|
|
||||||
// provide the ability to infer config value based on other configs
|
// provide the ability to infer config value based on other configs
|
||||||
private final Option<Function<HoodieConfig, Option<T>>> inferFunction;
|
private final Option<Function<HoodieConfig, Option<T>>> inferFunction;
|
||||||
|
|
||||||
ConfigProperty(String key, T defaultValue, String doc, Option<String> sinceVersion,
|
ConfigProperty(String key, T defaultValue, String doc, Option<String> sinceVersion,
|
||||||
Option<String> deprecatedVersion, Option<Function<HoodieConfig, Option<T>>> inferFunc, String... alternatives) {
|
Option<String> deprecatedVersion, Option<Function<HoodieConfig, Option<T>>> inferFunc, Set<String> validValues, String... alternatives) {
|
||||||
this.key = Objects.requireNonNull(key);
|
this.key = Objects.requireNonNull(key);
|
||||||
this.defaultValue = defaultValue;
|
this.defaultValue = defaultValue;
|
||||||
this.doc = doc;
|
this.doc = doc;
|
||||||
this.sinceVersion = sinceVersion;
|
this.sinceVersion = sinceVersion;
|
||||||
this.deprecatedVersion = deprecatedVersion;
|
this.deprecatedVersion = deprecatedVersion;
|
||||||
this.inferFunction = inferFunc;
|
this.inferFunction = inferFunc;
|
||||||
|
this.validValues = validValues;
|
||||||
this.alternatives = alternatives;
|
this.alternatives = alternatives;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -95,33 +101,46 @@ public class ConfigProperty<T> implements Serializable {
|
|||||||
return inferFunction;
|
return inferFunction;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void checkValues(String value) {
|
||||||
|
if (validValues != null && !validValues.isEmpty() && !validValues.contains(value)) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"The value of " + key + " should be one of "
|
||||||
|
+ String.join(",", validValues) + ", but was " + value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public List<String> getAlternatives() {
|
public List<String> getAlternatives() {
|
||||||
return Arrays.asList(alternatives);
|
return Arrays.asList(alternatives);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConfigProperty<T> withDocumentation(String doc) {
|
public ConfigProperty<T> withDocumentation(String doc) {
|
||||||
Objects.requireNonNull(doc);
|
Objects.requireNonNull(doc);
|
||||||
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives);
|
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConfigProperty<T> withValidValues(String... validValues) {
|
||||||
|
Objects.requireNonNull(validValues);
|
||||||
|
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, new HashSet<>(Arrays.asList(validValues)), alternatives);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConfigProperty<T> withAlternatives(String... alternatives) {
|
public ConfigProperty<T> withAlternatives(String... alternatives) {
|
||||||
Objects.requireNonNull(alternatives);
|
Objects.requireNonNull(alternatives);
|
||||||
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, alternatives);
|
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, inferFunction, validValues, alternatives);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConfigProperty<T> sinceVersion(String sinceVersion) {
|
public ConfigProperty<T> sinceVersion(String sinceVersion) {
|
||||||
Objects.requireNonNull(sinceVersion);
|
Objects.requireNonNull(sinceVersion);
|
||||||
return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, alternatives);
|
return new ConfigProperty<>(key, defaultValue, doc, Option.of(sinceVersion), deprecatedVersion, inferFunction, validValues, alternatives);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConfigProperty<T> deprecatedAfter(String deprecatedVersion) {
|
public ConfigProperty<T> deprecatedAfter(String deprecatedVersion) {
|
||||||
Objects.requireNonNull(deprecatedVersion);
|
Objects.requireNonNull(deprecatedVersion);
|
||||||
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, alternatives);
|
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, Option.of(deprecatedVersion), inferFunction, validValues, alternatives);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConfigProperty<T> withInferFunction(Function<HoodieConfig, Option<T>> inferFunction) {
|
public ConfigProperty<T> withInferFunction(Function<HoodieConfig, Option<T>> inferFunction) {
|
||||||
Objects.requireNonNull(inferFunction);
|
Objects.requireNonNull(inferFunction);
|
||||||
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), alternatives);
|
return new ConfigProperty<>(key, defaultValue, doc, sinceVersion, deprecatedVersion, Option.of(inferFunction), validValues, alternatives);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -156,13 +175,13 @@ public class ConfigProperty<T> implements Serializable {
|
|||||||
|
|
||||||
public <T> ConfigProperty<T> defaultValue(T value) {
|
public <T> ConfigProperty<T> defaultValue(T value) {
|
||||||
Objects.requireNonNull(value);
|
Objects.requireNonNull(value);
|
||||||
ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty());
|
ConfigProperty<T> configProperty = new ConfigProperty<>(key, value, "", Option.empty(), Option.empty(), Option.empty(), Collections.emptySet());
|
||||||
return configProperty;
|
return configProperty;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConfigProperty<String> noDefaultValue() {
|
public ConfigProperty<String> noDefaultValue() {
|
||||||
ConfigProperty<String> configProperty = new ConfigProperty<>(key, null, "", Option.empty(),
|
ConfigProperty<String> configProperty = new ConfigProperty<>(key, null, "", Option.empty(),
|
||||||
Option.empty(), Option.empty());
|
Option.empty(), Option.empty(), Collections.emptySet());
|
||||||
return configProperty;
|
return configProperty;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,6 +57,7 @@ public class HoodieConfig implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public <T> void setValue(ConfigProperty<T> cfg, String val) {
|
public <T> void setValue(ConfigProperty<T> cfg, String val) {
|
||||||
|
cfg.checkValues(val);
|
||||||
props.setProperty(cfg.key(), val);
|
props.setProperty(cfg.key(), val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -56,6 +56,7 @@ object DataSourceReadOptions {
|
|||||||
.key("hoodie.datasource.query.type")
|
.key("hoodie.datasource.query.type")
|
||||||
.defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
.defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL)
|
||||||
.withAlternatives("hoodie.datasource.view.type")
|
.withAlternatives("hoodie.datasource.view.type")
|
||||||
|
.withValidValues(QUERY_TYPE_SNAPSHOT_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
.withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " +
|
.withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " +
|
||||||
"(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
|
"(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
|
||||||
"(obtain latest view, by merging base and (if any) log files)")
|
"(obtain latest view, by merging base and (if any) log files)")
|
||||||
@@ -65,6 +66,7 @@ object DataSourceReadOptions {
|
|||||||
val REALTIME_MERGE: ConfigProperty[String] = ConfigProperty
|
val REALTIME_MERGE: ConfigProperty[String] = ConfigProperty
|
||||||
.key("hoodie.datasource.merge.type")
|
.key("hoodie.datasource.merge.type")
|
||||||
.defaultValue(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
|
.defaultValue(REALTIME_PAYLOAD_COMBINE_OPT_VAL)
|
||||||
|
.withValidValues(REALTIME_SKIP_MERGE_OPT_VAL, REALTIME_PAYLOAD_COMBINE_OPT_VAL)
|
||||||
.withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " +
|
.withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " +
|
||||||
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
|
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
|
||||||
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
|
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
|
||||||
@@ -210,6 +212,23 @@ object DataSourceWriteOptions {
|
|||||||
val OPERATION: ConfigProperty[String] = ConfigProperty
|
val OPERATION: ConfigProperty[String] = ConfigProperty
|
||||||
.key("hoodie.datasource.write.operation")
|
.key("hoodie.datasource.write.operation")
|
||||||
.defaultValue(UPSERT_OPERATION_OPT_VAL)
|
.defaultValue(UPSERT_OPERATION_OPT_VAL)
|
||||||
|
.withValidValues(
|
||||||
|
WriteOperationType.INSERT.value,
|
||||||
|
WriteOperationType.INSERT_PREPPED.value,
|
||||||
|
WriteOperationType.UPSERT.value,
|
||||||
|
WriteOperationType.UPSERT_PREPPED.value,
|
||||||
|
WriteOperationType.BULK_INSERT.value,
|
||||||
|
WriteOperationType.BULK_INSERT_PREPPED.value,
|
||||||
|
WriteOperationType.DELETE.value,
|
||||||
|
WriteOperationType.BOOTSTRAP.value,
|
||||||
|
WriteOperationType.INSERT_OVERWRITE.value,
|
||||||
|
WriteOperationType.CLUSTER.value,
|
||||||
|
WriteOperationType.DELETE_PARTITION.value,
|
||||||
|
WriteOperationType.INSERT_OVERWRITE_TABLE.value,
|
||||||
|
WriteOperationType.COMPACT.value,
|
||||||
|
WriteOperationType.INSERT.value,
|
||||||
|
WriteOperationType.ALTER_SCHEMA.value
|
||||||
|
)
|
||||||
.withDocumentation("Whether to do upsert, insert or bulkinsert for the write operation. " +
|
.withDocumentation("Whether to do upsert, insert or bulkinsert for the write operation. " +
|
||||||
"Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
|
"Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
|
||||||
"bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
|
"bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
|
||||||
@@ -220,6 +239,7 @@ object DataSourceWriteOptions {
|
|||||||
val TABLE_TYPE: ConfigProperty[String] = ConfigProperty
|
val TABLE_TYPE: ConfigProperty[String] = ConfigProperty
|
||||||
.key("hoodie.datasource.write.table.type")
|
.key("hoodie.datasource.write.table.type")
|
||||||
.defaultValue(COW_TABLE_TYPE_OPT_VAL)
|
.defaultValue(COW_TABLE_TYPE_OPT_VAL)
|
||||||
|
.withValidValues(COW_TABLE_TYPE_OPT_VAL, MOR_TABLE_TYPE_OPT_VAL)
|
||||||
.withAlternatives("hoodie.datasource.write.storage.type")
|
.withAlternatives("hoodie.datasource.write.storage.type")
|
||||||
.withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")
|
.withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")
|
||||||
|
|
||||||
@@ -308,7 +328,8 @@ object DataSourceWriteOptions {
|
|||||||
Option.of(classOf[NonpartitionedKeyGenerator].getName)
|
Option.of(classOf[NonpartitionedKeyGenerator].getName)
|
||||||
} else {
|
} else {
|
||||||
val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
|
val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
|
||||||
if (numOfPartFields == 1) {
|
val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length
|
||||||
|
if (numOfPartFields == 1 && numOfRecordKeyFields == 1) {
|
||||||
Option.of(classOf[SimpleKeyGenerator].getName)
|
Option.of(classOf[SimpleKeyGenerator].getName)
|
||||||
} else {
|
} else {
|
||||||
Option.of(classOf[ComplexKeyGenerator].getName)
|
Option.of(classOf[ComplexKeyGenerator].getName)
|
||||||
|
|||||||
Reference in New Issue
Block a user