1
0

[HUDI-3640] Set SimpleKeyGenerator as default in 2to3 table upgrade for Spark engine (#5075)

This commit is contained in:
Y Ethan Guo
2022-03-21 17:35:06 -07:00
committed by GitHub
parent ca0931d332
commit 9b6e138af2
4 changed files with 104 additions and 9 deletions

View File

@@ -20,6 +20,7 @@
package org.apache.hudi.table.upgrade; package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -34,6 +35,8 @@ import java.util.Map;
* UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 2 to 3. * UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 2 to 3.
*/ */
public class TwoToThreeUpgradeHandler implements UpgradeHandler { public class TwoToThreeUpgradeHandler implements UpgradeHandler {
public static final String SPARK_SIMPLE_KEY_GENERATOR = "org.apache.hudi.keygen.SimpleKeyGenerator";
@Override @Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
if (config.isMetadataTableEnabled()) { if (config.isMetadataTableEnabled()) {
@@ -47,6 +50,11 @@ public class TwoToThreeUpgradeHandler implements UpgradeHandler {
tablePropsToAdd.put(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, config.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); tablePropsToAdd.put(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, config.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
String keyGenClassName = Option.ofNullable(config.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) String keyGenClassName = Option.ofNullable(config.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME))
.orElse(config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)); .orElse(config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME));
if (keyGenClassName == null && config.getEngineType() == EngineType.SPARK) {
// For Spark, if the key generator class is not configured by user,
// set it to SimpleKeyGenerator as default
keyGenClassName = SPARK_SIMPLE_KEY_GENERATOR;
}
ValidationUtils.checkState(keyGenClassName != null, String.format("Missing config: %s or %s", ValidationUtils.checkState(keyGenClassName != null, String.format("Missing config: %s or %s",
HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)); HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, HoodieWriteConfig.KEYGENERATOR_CLASS_NAME));
tablePropsToAdd.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, keyGenClassName); tablePropsToAdd.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, keyGenClassName);

View File

@@ -21,13 +21,14 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.KeyGenerator;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import java.util.Map; import java.util.Map;
@@ -58,11 +59,24 @@ class TestTwoToThreeUpgradeHandler {
assertEquals(KeyGenerator.class.getName(), kv.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)); assertEquals(KeyGenerator.class.getName(), kv.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME));
} }
@Test @ParameterizedTest
void upgradeHandlerShouldThrowWhenKeyGeneratorNotSet() { @EnumSource(EngineType.class)
void upgradeHandlerWhenKeyGeneratorNotSet(EngineType engineType) {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(engineType)
.forTable("foo")
.withPath("/foo")
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.build();
TwoToThreeUpgradeHandler handler = new TwoToThreeUpgradeHandler(); TwoToThreeUpgradeHandler handler = new TwoToThreeUpgradeHandler();
if (engineType == EngineType.SPARK) {
Map<ConfigProperty, String> kv = handler.upgrade(config, null, null, null);
assertEquals(TwoToThreeUpgradeHandler.SPARK_SIMPLE_KEY_GENERATOR,
kv.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME));
} else {
Throwable t = assertThrows(IllegalStateException.class, () -> handler Throwable t = assertThrows(IllegalStateException.class, () -> handler
.upgrade(config, null, null, null)); .upgrade(writeConfig, null, null, null));
assertTrue(t.getMessage().startsWith("Missing config:")); assertTrue(t.getMessage().startsWith("Missing config:"));
} }
} }
}

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.client.clustering.run.strategy; package org.apache.hudi.client.clustering.run.strategy;
import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -30,6 +30,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.JavaBulkInsertHelper; import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -63,7 +65,8 @@ public class JavaSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA).withProps(props).build();
return (List<WriteStatus>) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, return (List<WriteStatus>) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
} }

View File

@@ -38,6 +38,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkers;
@@ -117,6 +119,16 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
return Stream.of(data).map(Arguments::of); return Stream.of(data).map(Arguments::of);
} }
public static Stream<Arguments> twoToThreeUpgradeConfigParams() {
Object[][] data = new Object[][] {
{HoodieTableType.COPY_ON_WRITE, Option.empty()},
{HoodieTableType.COPY_ON_WRITE, Option.of(TimestampBasedKeyGenerator.class.getName())},
{HoodieTableType.MERGE_ON_READ, Option.empty()},
{HoodieTableType.MERGE_ON_READ, Option.of(TimestampBasedKeyGenerator.class.getName())}
};
return Stream.of(data).map(Arguments::of);
}
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
initSparkContexts(); initSparkContexts();
@@ -232,6 +244,51 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
assertTableProps(cfg); assertTableProps(cfg);
} }
@ParameterizedTest
@MethodSource("twoToThreeUpgradeConfigParams")
public void testUpgradeTwoToThree(
HoodieTableType tableType, Option<String> keyGeneratorClass) throws IOException {
// init config, table and client.
Map<String, String> params = new HashMap<>();
addNewTableParamsToProps(params);
if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
}
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder()
.withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params);
if (keyGeneratorClass.isPresent()) {
cfgBuilder.withKeyGenerator(keyGeneratorClass.get());
}
HoodieWriteConfig cfg = cfgBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// Write inserts
doInsert(client);
// downgrade table props
downgradeTableConfigsFromThreeToTwo(cfg);
// perform upgrade
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.THREE, null);
// verify hoodie.table.version got upgraded
metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.THREE);
// verify table props
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Properties originalProps = cfg.getProps();
assertEquals(tableConfig.getUrlEncodePartitioning(),
cfg.getStringOrDefault(HoodieTableConfig.URL_ENCODE_PARTITIONING));
assertEquals(tableConfig.getHiveStylePartitioningEnable(),
cfg.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
assertEquals(tableConfig.getKeyGeneratorClassName(), originalProps.getOrDefault(
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()));
}
@Test @Test
public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOException { public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOException {
// init config, table and client. // init config, table and client.
@@ -298,6 +355,19 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE);
} }
private void downgradeTableConfigsFromThreeToTwo(HoodieWriteConfig cfg) throws IOException {
Properties properties = new Properties(cfg.getProps());
properties.remove(HoodieTableConfig.URL_ENCODE_PARTITIONING.key());
properties.remove(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key());
properties.remove(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key());
properties.remove(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key());
properties.setProperty(HoodieTableConfig.VERSION.key(), "2");
metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType(), properties);
// set hoodie.table.version to 2 in hoodie.properties file
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.TWO);
}
private void assertTableProps(HoodieWriteConfig cfg) { private void assertTableProps(HoodieWriteConfig cfg) {
HoodieTableConfig tableConfig = metaClient.getTableConfig(); HoodieTableConfig tableConfig = metaClient.getTableConfig();
Properties originalProps = cfg.getProps(); Properties originalProps = cfg.getProps();