1
0

[HUDI-1412] Make HoodieWriteConfig support setting different default … (#2278)

* [HUDI-1412] Make HoodieWriteConfig support setting different default value according to engine type
This commit is contained in:
wangxianghu
2020-12-07 09:29:53 +08:00
committed by GitHub
parent 319b7a58e4
commit de2fbeac33
5 changed files with 105 additions and 11 deletions

View File

@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.common;
/**
* Hoodie data processing engine. support only Apache Spark and Apache Flink for now.
*/
public enum EngineType {
SPARK, FLINK
}

View File

@@ -20,6 +20,8 @@ package org.apache.hudi.config;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import javax.annotation.concurrent.Immutable;
@@ -36,7 +38,6 @@ import java.util.Properties;
public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String INDEX_TYPE_PROP = "hoodie.index.type";
public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name();
public static final String INDEX_CLASS_PROP = "hoodie.index.class";
public static final String DEFAULT_INDEX_CLASS = "";
@@ -103,8 +104,18 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String SIMPLE_INDEX_UPDATE_PARTITION_PATH = "hoodie.simple.index.update.partition.path";
public static final String DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH = "false";
private EngineType engineType;
/**
* Use Spark engine by default.
*/
private HoodieIndexConfig(Properties props) {
this(EngineType.SPARK, props);
}
private HoodieIndexConfig(EngineType engineType, Properties props) {
super(props);
this.engineType = engineType;
}
public static HoodieIndexConfig.Builder newBuilder() {
@@ -113,6 +124,7 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static class Builder {
private EngineType engineType = EngineType.SPARK;
private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException {
@@ -237,9 +249,14 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
}
public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
HoodieIndexConfig config = new HoodieIndexConfig(engineType, props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, getDefaultIndexType(engineType));
setDefaultOnCondition(props, !props.containsKey(INDEX_CLASS_PROP), INDEX_CLASS_PROP, DEFAULT_INDEX_CLASS);
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES,
DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
@@ -278,5 +295,20 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;
}
private String getDefaultIndexType(EngineType engineType) {
switch (engineType) {
case SPARK:
return HoodieIndex.IndexType.BLOOM.name();
case FLINK:
return HoodieIndex.IndexType.INMEMORY.name();
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
public EngineType getEngineType() {
return engineType;
}
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
@@ -124,10 +125,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
*
* <p>
* Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It
* only works with COW table. Hudi 0.5.x had stopped this behavior.
*
* <p>
* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
* (disabled by default) which will allow this old behavior.
*/
@@ -145,10 +146,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
private FileSystemViewStorageConfig viewStorageConfig;
private EngineType engineType;
/**
* Use Spark engine by default.
*/
protected HoodieWriteConfig(Properties props) {
this(EngineType.SPARK, props);
}
protected HoodieWriteConfig(EngineType engineType, Properties props) {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
this.engineType = engineType;
this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
@@ -290,6 +301,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
}
public EngineType getEngineType() {
return engineType;
}
/**
* compaction properties.
*/
@@ -779,6 +794,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static class Builder {
protected final Properties props = new Properties();
protected EngineType engineType = EngineType.SPARK;
private boolean isIndexConfigSet = false;
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
@@ -789,6 +805,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isConsistencyGuardSet = false;
private boolean isCallbackConfigSet = false;
public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
}
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
@@ -1049,7 +1070,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build());
setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
@@ -1081,7 +1102,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaults();
validate();
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
HoodieWriteConfig config = new HoodieWriteConfig(engineType, props);
return config;
}
}

View File

@@ -18,8 +18,10 @@
package org.apache.hudi.config;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.config.HoodieWriteConfig.Builder;
import org.apache.hudi.index.HoodieIndex;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
@@ -54,6 +56,21 @@ public class TestHoodieWriteConfig {
assertEquals(2, config.getMinCommitsToKeep());
}
@Test
public void testDefaultIndexAccordingToEngineType() {
// default bloom
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build();
assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType());
// spark default bloom
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType());
// flink default in-memory
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath("/tmp").build();
assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType());
}
private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String> params) throws IOException {
Properties properties = new Properties();
properties.putAll(params);

View File

@@ -22,14 +22,13 @@ import org.apache.hudi.HoodieFlinkStreamer;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
@@ -134,10 +133,9 @@ public class StreamerUtil {
public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withAutoCommit(false)
.withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
.getConfig());