From ed106f671e929d2a6277ae189242201a02a7b58f Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 18 Feb 2022 10:17:06 +0530 Subject: [PATCH] [HUDI-2809] Introduce a checksum mechanism for validating hoodie.properties (#4712) Fix dependency conflict Fix repairs command Implement putIfAbsent for DDB lock provider Add upgrade step and validate while fetching configs Validate checksum for latest table version only while fetching config Move generateChecksum to BinaryUtil Rebase and resolve conflict Fix table version check --- .../hudi/cli/commands/RepairsCommand.java | 5 +- .../hudi/cli/commands/TestRepairsCommand.java | 32 +++++--- hudi-client/hudi-client-common/pom.xml | 1 - .../upgrade/FourToThreeDowngradeHandler.java | 38 ++++++++++ .../upgrade/ThreeToFourUpgradeHandler.java | 41 +++++++++++ .../hudi/table/upgrade/UpgradeDowngrade.java | 4 + hudi-client/hudi-spark-client/pom.xml | 6 +- .../functional/TestHoodieBackedMetadata.java | 4 +- .../table/upgrade/TestUpgradeDowngrade.java | 37 ++++++++++ .../hudi/common/config/TypedProperties.java | 46 ++++++++++++ .../hudi/common/table/HoodieTableConfig.java | 73 ++++++++++++++++++- .../hudi/common/table/HoodieTableVersion.java | 6 +- .../apache/hudi/common/util/BinaryUtil.java | 10 +++ .../hudi/common/util/SpillableMapUtils.java | 11 +-- .../util/collection/BitCaskDiskMap.java | 4 +- .../properties/TestTypedProperties.java | 23 ++++++ .../common/table/TestHoodieTableConfig.java | 10 +-- .../table/TestHoodieTableMetaClient.java | 2 + hudi-integ-test/pom.xml | 1 - pom.xml | 6 ++ 20 files changed, 320 insertions(+), 40 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 2533562d8..6c068c898 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -45,7 +45,6 @@ import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; import scala.collection.JavaConverters; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.List; @@ -153,10 +152,12 @@ public class RepairsCommand implements CommandMarker { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); Properties newProps = new Properties(); - newProps.load(new FileInputStream(new File(overwriteFilePath))); + newProps.load(new FileInputStream(overwriteFilePath)); Map oldProps = client.getTableConfig().propsMap(); Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME); HoodieTableConfig.create(client.getFs(), metaPathDir, newProps); + // reload new props as checksum would have been added + newProps = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps(); TreeSet allPropKeys = new TreeSet<>(); allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet())); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index 048b2a20e..27cc31cce 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -39,7 +39,6 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.URL; @@ -51,6 +50,14 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.table.HoodieTableConfig.NAME; +import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM; +import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_LAYOUT_VERSION; +import static org.apache.hudi.common.table.HoodieTableConfig.TYPE; +import static org.apache.hudi.common.table.HoodieTableConfig.VERSION; +import static org.apache.hudi.common.table.HoodieTableConfig.generateChecksum; +import static org.apache.hudi.common.table.HoodieTableConfig.validateChecksum; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -104,7 +111,7 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { // expected all 'No'. String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath) .stream() - .map(partition -> new String[]{partition, "No", "None"}) + .map(partition -> new String[] {partition, "No", "None"}) .toArray(String[][]::new); String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); @@ -135,7 +142,7 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { List paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath); // after dry run, the action will be 'Repaired' String[][] rows = paths.stream() - .map(partition -> new String[]{partition, "No", "Repaired"}) + .map(partition -> new String[] {partition, "No", "Repaired"}) .toArray(String[][]::new); String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); @@ -147,7 +154,7 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { // after real run, Metadata is present now. rows = paths.stream() - .map(partition -> new String[]{partition, "Yes", "None"}) + .map(partition -> new String[] {partition, "Yes", "None"}) .toArray(String[][]::new); expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); @@ -170,19 +177,24 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { Map oldProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap(); // after overwrite, the stored value in .hoodie is equals to which read from properties. - Map result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().propsMap(); + HoodieTableConfig tableConfig = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig(); + Map result = tableConfig.propsMap(); + // validate table checksum + assertTrue(result.containsKey(TABLE_CHECKSUM.key())); + assertTrue(validateChecksum(tableConfig.getProps())); Properties expectProps = new Properties(); - expectProps.load(new FileInputStream(new File(newProps.getPath()))); + expectProps.load(new FileInputStream(newProps.getPath())); Map expected = expectProps.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); + expected.putIfAbsent(TABLE_CHECKSUM.key(), String.valueOf(generateChecksum(tableConfig.getProps()))); assertEquals(expected, result); // check result - List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", "hoodie.table.version", - "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); - String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key, - oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")}) + List allPropsStr = Arrays.asList(NAME.key(), TYPE.key(), VERSION.key(), + ARCHIVELOG_FOLDER.key(), TIMELINE_LAYOUT_VERSION.key(), TABLE_CHECKSUM.key()); + String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key, + oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")}) .toArray(String[][]::new); String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index a9209f553..a55a13665 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -163,7 +163,6 @@ org.awaitility awaitility - 3.1.2 test diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java new file mode 100644 index 000000000..17dc01d02 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.Collections; +import java.util.Map; + +/** + * DowngradeHandler to assist in downgrading {@link org.apache.hudi.table.HoodieTable} from version 4 to 3. + */ +public class FourToThreeDowngradeHandler implements DowngradeHandler { + + @Override + public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + return Collections.emptyMap(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java new file mode 100644 index 000000000..72e96bb41 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.Hashtable; +import java.util.Map; + +/** + * UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 3 to 4. + */ +public class ThreeToFourUpgradeHandler implements UpgradeHandler { + + @Override + public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + Map tablePropsToAdd = new Hashtable<>(); + tablePropsToAdd.put(HoodieTableConfig.TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps()))); + return tablePropsToAdd; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index e1e7e2b33..1a75ff51c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -143,6 +143,8 @@ public class UpgradeDowngrade { return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) { return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.FOUR) { + return new ThreeToFourUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); } @@ -155,6 +157,8 @@ public class UpgradeDowngrade { return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) { return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.THREE) { + return new FourToThreeDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); } diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml index e4a8fd56b..d6c60cb61 100644 --- a/hudi-client/hudi-spark-client/pom.xml +++ b/hudi-client/hudi-spark-client/pom.xml @@ -169,9 +169,9 @@ - org.awaitility - awaitility - test + org.awaitility + awaitility + test diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f175dc5de..37595f6f7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1548,7 +1548,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime()); initMetaClient(); - assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode()); assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath)); assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); @@ -1630,7 +1630,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } initMetaClient(); - assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode()); assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath)); assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 5f5dfdec5..fea205694 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -232,6 +232,43 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { assertTableProps(cfg); } + @Test + public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOException { + // init config, table and client. + Map params = new HashMap<>(); + addNewTableParamsToProps(params); + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); + + // write inserts + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + doInsert(client); + + // current version should have TABLE_CHECKSUM key + assertEquals(HoodieTableVersion.current(), metaClient.getTableConfig().getTableVersion()); + assertTableVersionFromPropertyFile(HoodieTableVersion.current()); + assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key())); + String checksum = metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()); + + // downgrade to version 3 and check TABLE_CHECKSUM is still present + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.THREE, null); + assertEquals(HoodieTableVersion.THREE.versionCode(), metaClient.getTableConfig().getTableVersion().versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.THREE); + assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key())); + assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key())); + + // remove TABLE_CHECKSUM and upgrade to current version + metaClient.getTableConfig().getProps().remove(HoodieTableConfig.TABLE_CHECKSUM.key()); + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), null); + + // verify upgrade and TABLE_CHECKSUM + metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build(); + assertEquals(HoodieTableVersion.current().versionCode(), metaClient.getTableConfig().getTableVersion().versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.current()); + assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key())); + assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key())); + } + private void addNewTableParamsToProps(Map params) { params.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); params.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java index 2688e6454..4dbb0bc2b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java @@ -20,9 +20,14 @@ package org.apache.hudi.common.config; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; /** @@ -30,6 +35,8 @@ import java.util.stream.Collectors; */ public class TypedProperties extends Properties implements Serializable { + private final HashSet keys = new LinkedHashSet<>(); + public TypedProperties() { super(null); } @@ -42,6 +49,45 @@ public class TypedProperties extends Properties implements Serializable { } } + @Override + public Enumeration propertyNames() { + return Collections.enumeration(keys); + } + + @Override + public synchronized Enumeration keys() { + return Collections.enumeration(keys); + } + + @Override + public Set stringPropertyNames() { + Set set = new LinkedHashSet<>(); + for (Object key : this.keys) { + set.add((String) key); + } + return set; + } + + @Override + public synchronized Object put(Object key, Object value) { + keys.remove(key); + keys.add(key); + return super.put(key, value); + } + + public synchronized Object putIfAbsent(Object key, Object value) { + if (!containsKey(String.valueOf(key))) { + keys.add(key); + } + return super.putIfAbsent(key, value); + } + + @Override + public Object remove(Object key) { + keys.remove(key); + return super.remove(key); + } + private void checkKey(String property) { if (!containsKey(property)) { throw new IllegalArgumentException("Property " + property + " not found"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index b26de60f2..dc010366c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -38,6 +39,7 @@ import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.util.BinaryUtil; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -58,6 +60,8 @@ import java.util.Set; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during * initializing a path as hoodie base path and never changes during the lifetime of a hoodie table. @@ -197,6 +201,14 @@ public class HoodieTableConfig extends HoodieConfig { public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName(); + public static final ConfigProperty TABLE_CHECKSUM = ConfigProperty + .key("hoodie.table.checksum") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("Table checksum is used to guard against partial writes in HDFS. It is added as the last entry in hoodie.properties and then used to validate while reading table config."); + + private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // . + public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { super(); Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE); @@ -208,6 +220,9 @@ public class HoodieTableConfig extends HoodieConfig { setValue(PAYLOAD_CLASS_NAME, payloadClassName); // FIXME(vc): wonder if this can be removed. Need to look into history. try (FSDataOutputStream outputStream = fs.create(propertyPath)) { + if (!isValidChecksum()) { + setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(props))); + } props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } } @@ -218,6 +233,10 @@ public class HoodieTableConfig extends HoodieConfig { "hoodie.properties file seems invalid. Please check for left over `.updated` files if any, manually copy it to hoodie.properties and retry"); } + private boolean isValidChecksum() { + return contains(TABLE_CHECKSUM) && validateChecksum(props); + } + /** * For serializing and de-serializing. */ @@ -227,13 +246,20 @@ public class HoodieTableConfig extends HoodieConfig { private void fetchConfigs(FileSystem fs, String metaPath) throws IOException { Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE); + Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP); try (FSDataInputStream is = fs.open(cfgPath)) { props.load(is); + // validate checksum for latest table version + if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) { + LOG.warn("Checksum validation failed. Falling back to backed up configs."); + try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) { + props.load(fsDataInputStream); + } + } } catch (IOException ioe) { if (!fs.exists(cfgPath)) { LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs."); // try the backup. this way no query ever fails if update fails midway. - Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP); try (FSDataInputStream is = fs.open(backupCfgPath)) { props.load(is); } @@ -284,15 +310,31 @@ public class HoodieTableConfig extends HoodieConfig { /// 2. delete the properties file, reads will go to the backup, until we are done. fs.delete(cfgPath, false); // 3. read current props, upsert and save back. + String checksum; try (FSDataInputStream in = fs.open(backupCfgPath); FSDataOutputStream out = fs.create(cfgPath, true)) { - Properties props = new Properties(); + Properties props = new TypedProperties(); props.load(in); modifyFn.accept(props, modifyProps); + if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) { + checksum = props.getProperty(TABLE_CHECKSUM.key()); + } else { + checksum = String.valueOf(generateChecksum(props)); + props.setProperty(TABLE_CHECKSUM.key(), checksum); + } props.store(out, "Updated at " + System.currentTimeMillis()); } // 4. verify and remove backup. - // FIXME(vc): generate a hash for verification. + try (FSDataInputStream in = fs.open(cfgPath)) { + Properties props = new TypedProperties(); + props.load(in); + if (!props.containsKey(TABLE_CHECKSUM.key()) || !props.getProperty(TABLE_CHECKSUM.key()).equals(checksum)) { + // delete the properties file and throw exception indicating update failure + // subsequent writes will recover and update, reads will go to the backup until then + fs.delete(cfgPath, false); + throw new HoodieIOException("Checksum property missing or does not match."); + } + } fs.delete(backupCfgPath, false); } catch (IOException e) { throw new HoodieIOException("Error updating table configs.", e); @@ -343,10 +385,28 @@ public class HoodieTableConfig extends HoodieConfig { if (hoodieConfig.contains(TIMELINE_TIMEZONE)) { HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE))); } + if (hoodieConfig.contains(TABLE_CHECKSUM)) { + hoodieConfig.setValue(TABLE_CHECKSUM, hoodieConfig.getString(TABLE_CHECKSUM)); + } else { + hoodieConfig.setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(hoodieConfig.getProps()))); + } hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } } + public static long generateChecksum(Properties props) { + if (!props.containsKey(NAME.key())) { + throw new IllegalArgumentException(NAME.key() + " property needs to be specified"); + } + String table = props.getProperty(NAME.key()); + String database = props.getProperty(DATABASE_NAME.key(), ""); + return BinaryUtil.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); + } + + public static boolean validateChecksum(Properties props) { + return Long.parseLong(props.getProperty(TABLE_CHECKSUM.key())) == generateChecksum(props); + } + /** * Read the table type from the table properties and if not found, return the default. */ @@ -505,6 +565,13 @@ public class HoodieTableConfig extends HoodieConfig { return getString(URL_ENCODE_PARTITIONING); } + /** + * Read the table checksum. + */ + private Long getTableChecksum() { + return getLong(TABLE_CHECKSUM); + } + public Map propsMap() { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java index 122c38775..3a249689a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java @@ -34,7 +34,9 @@ public enum HoodieTableVersion { // 0.9.0 onwards TWO(2), // 0.10.0 onwards - THREE(3); + THREE(3), + // 0.11.0 onwards + FOUR(4); private final int versionCode; @@ -47,7 +49,7 @@ public enum HoodieTableVersion { } public static HoodieTableVersion current() { - return THREE; + return FOUR; } public static HoodieTableVersion versionFromCode(int versionCode) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java index 0c7e89895..9fec2c8cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util; import java.nio.charset.Charset; +import java.util.zip.CRC32; public class BinaryUtil { @@ -187,5 +188,14 @@ public class BinaryUtil { } return temp; } + + /** + * Generate a checksum for a given set of bytes. + */ + public static long generateChecksum(byte[] data) { + CRC32 crc = new CRC32(); + crc.update(data); + return crc.getValue(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index b38eeba3f..9ded41543 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -33,9 +33,9 @@ import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.zip.CRC32; import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString; +import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; /** * A utility class supports spillable map. @@ -95,15 +95,6 @@ public class SpillableMapUtils { return outputStream.getSize(); } - /** - * Generate a checksum for a given set of bytes. - */ - public static long generateChecksum(byte[] data) { - CRC32 crc = new CRC32(); - crc.update(data); - return crc.getValue(); - } - /** * Compute a bytes representation of the payload by serializing the contents This is used to estimate the size of the * payload (either in memory or when written to disk). diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 289901df8..9fb0b20e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -56,6 +56,8 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; +import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; + /** * This class provides a disk spillable only map implementation. All of the data is currenly written to one file, * without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata. @@ -223,7 +225,7 @@ public final class BitCaskDiskMap org.awaitility awaitility - 3.1.2 test diff --git a/pom.xml b/pom.xml index 8ebf7a9ef..fa1ee3503 100644 --- a/pom.xml +++ b/pom.xml @@ -1119,6 +1119,12 @@ awaitility ${awaitility.version} test + + + org.objenesis + objenesis + +