diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 2533562d8..6c068c898 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -45,7 +45,6 @@ import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; import scala.collection.JavaConverters; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.List; @@ -153,10 +152,12 @@ public class RepairsCommand implements CommandMarker { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); Properties newProps = new Properties(); - newProps.load(new FileInputStream(new File(overwriteFilePath))); + newProps.load(new FileInputStream(overwriteFilePath)); Map oldProps = client.getTableConfig().propsMap(); Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME); HoodieTableConfig.create(client.getFs(), metaPathDir, newProps); + // reload new props as checksum would have been added + newProps = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps(); TreeSet allPropKeys = new TreeSet<>(); allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet())); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index 048b2a20e..27cc31cce 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -39,7 +39,6 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.URL; @@ -51,6 +50,14 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.table.HoodieTableConfig.NAME; +import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM; +import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_LAYOUT_VERSION; +import static org.apache.hudi.common.table.HoodieTableConfig.TYPE; +import static org.apache.hudi.common.table.HoodieTableConfig.VERSION; +import static org.apache.hudi.common.table.HoodieTableConfig.generateChecksum; +import static org.apache.hudi.common.table.HoodieTableConfig.validateChecksum; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -104,7 +111,7 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { // expected all 'No'. String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath) .stream() - .map(partition -> new String[]{partition, "No", "None"}) + .map(partition -> new String[] {partition, "No", "None"}) .toArray(String[][]::new); String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); @@ -135,7 +142,7 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { List paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath); // after dry run, the action will be 'Repaired' String[][] rows = paths.stream() - .map(partition -> new String[]{partition, "No", "Repaired"}) + .map(partition -> new String[] {partition, "No", "Repaired"}) .toArray(String[][]::new); String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); @@ -147,7 +154,7 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { // after real run, Metadata is present now. rows = paths.stream() - .map(partition -> new String[]{partition, "Yes", "None"}) + .map(partition -> new String[] {partition, "Yes", "None"}) .toArray(String[][]::new); expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH, HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows); @@ -170,19 +177,24 @@ public class TestRepairsCommand extends CLIFunctionalTestHarness { Map oldProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap(); // after overwrite, the stored value in .hoodie is equals to which read from properties. - Map result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().propsMap(); + HoodieTableConfig tableConfig = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig(); + Map result = tableConfig.propsMap(); + // validate table checksum + assertTrue(result.containsKey(TABLE_CHECKSUM.key())); + assertTrue(validateChecksum(tableConfig.getProps())); Properties expectProps = new Properties(); - expectProps.load(new FileInputStream(new File(newProps.getPath()))); + expectProps.load(new FileInputStream(newProps.getPath())); Map expected = expectProps.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); + expected.putIfAbsent(TABLE_CHECKSUM.key(), String.valueOf(generateChecksum(tableConfig.getProps()))); assertEquals(expected, result); // check result - List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", "hoodie.table.version", - "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); - String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key, - oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")}) + List allPropsStr = Arrays.asList(NAME.key(), TYPE.key(), VERSION.key(), + ARCHIVELOG_FOLDER.key(), TIMELINE_LAYOUT_VERSION.key(), TABLE_CHECKSUM.key()); + String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key, + oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")}) .toArray(String[][]::new); String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index a9209f553..a55a13665 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -163,7 +163,6 @@ org.awaitility awaitility - 3.1.2 test diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java new file mode 100644 index 000000000..17dc01d02 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FourToThreeDowngradeHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.Collections; +import java.util.Map; + +/** + * DowngradeHandler to assist in downgrading {@link org.apache.hudi.table.HoodieTable} from version 4 to 3. + */ +public class FourToThreeDowngradeHandler implements DowngradeHandler { + + @Override + public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + return Collections.emptyMap(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java new file mode 100644 index 000000000..72e96bb41 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.Hashtable; +import java.util.Map; + +/** + * UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 3 to 4. + */ +public class ThreeToFourUpgradeHandler implements UpgradeHandler { + + @Override + public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + Map tablePropsToAdd = new Hashtable<>(); + tablePropsToAdd.put(HoodieTableConfig.TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps()))); + return tablePropsToAdd; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index e1e7e2b33..1a75ff51c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -143,6 +143,8 @@ public class UpgradeDowngrade { return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) { return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.FOUR) { + return new ThreeToFourUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); } @@ -155,6 +157,8 @@ public class UpgradeDowngrade { return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) { return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.THREE) { + return new FourToThreeDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); } diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml index e4a8fd56b..d6c60cb61 100644 --- a/hudi-client/hudi-spark-client/pom.xml +++ b/hudi-client/hudi-spark-client/pom.xml @@ -169,9 +169,9 @@ - org.awaitility - awaitility - test + org.awaitility + awaitility + test diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f175dc5de..37595f6f7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1548,7 +1548,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime()); initMetaClient(); - assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode()); assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath)); assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); @@ -1630,7 +1630,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } initMetaClient(); - assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode()); assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath)); assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 5f5dfdec5..fea205694 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -232,6 +232,43 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { assertTableProps(cfg); } + @Test + public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOException { + // init config, table and client. + Map params = new HashMap<>(); + addNewTableParamsToProps(params); + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); + + // write inserts + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + doInsert(client); + + // current version should have TABLE_CHECKSUM key + assertEquals(HoodieTableVersion.current(), metaClient.getTableConfig().getTableVersion()); + assertTableVersionFromPropertyFile(HoodieTableVersion.current()); + assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key())); + String checksum = metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()); + + // downgrade to version 3 and check TABLE_CHECKSUM is still present + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.THREE, null); + assertEquals(HoodieTableVersion.THREE.versionCode(), metaClient.getTableConfig().getTableVersion().versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.THREE); + assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key())); + assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key())); + + // remove TABLE_CHECKSUM and upgrade to current version + metaClient.getTableConfig().getProps().remove(HoodieTableConfig.TABLE_CHECKSUM.key()); + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), null); + + // verify upgrade and TABLE_CHECKSUM + metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build(); + assertEquals(HoodieTableVersion.current().versionCode(), metaClient.getTableConfig().getTableVersion().versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.current()); + assertTrue(metaClient.getTableConfig().getProps().containsKey(HoodieTableConfig.TABLE_CHECKSUM.key())); + assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key())); + } + private void addNewTableParamsToProps(Map params) { params.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); params.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java index 2688e6454..4dbb0bc2b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java @@ -20,9 +20,14 @@ package org.apache.hudi.common.config; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; /** @@ -30,6 +35,8 @@ import java.util.stream.Collectors; */ public class TypedProperties extends Properties implements Serializable { + private final HashSet keys = new LinkedHashSet<>(); + public TypedProperties() { super(null); } @@ -42,6 +49,45 @@ public class TypedProperties extends Properties implements Serializable { } } + @Override + public Enumeration propertyNames() { + return Collections.enumeration(keys); + } + + @Override + public synchronized Enumeration keys() { + return Collections.enumeration(keys); + } + + @Override + public Set stringPropertyNames() { + Set set = new LinkedHashSet<>(); + for (Object key : this.keys) { + set.add((String) key); + } + return set; + } + + @Override + public synchronized Object put(Object key, Object value) { + keys.remove(key); + keys.add(key); + return super.put(key, value); + } + + public synchronized Object putIfAbsent(Object key, Object value) { + if (!containsKey(String.valueOf(key))) { + keys.add(key); + } + return super.putIfAbsent(key, value); + } + + @Override + public Object remove(Object key) { + keys.remove(key); + return super.remove(key); + } + private void checkKey(String property) { if (!containsKey(property)) { throw new IllegalArgumentException("Property " + property + " not found"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index b26de60f2..dc010366c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -38,6 +39,7 @@ import org.apache.hudi.common.model.HoodieTimelineTimeZone; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.util.BinaryUtil; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -58,6 +60,8 @@ import java.util.Set; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during * initializing a path as hoodie base path and never changes during the lifetime of a hoodie table. @@ -197,6 +201,14 @@ public class HoodieTableConfig extends HoodieConfig { public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName(); + public static final ConfigProperty TABLE_CHECKSUM = ConfigProperty + .key("hoodie.table.checksum") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("Table checksum is used to guard against partial writes in HDFS. It is added as the last entry in hoodie.properties and then used to validate while reading table config."); + + private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // . + public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { super(); Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE); @@ -208,6 +220,9 @@ public class HoodieTableConfig extends HoodieConfig { setValue(PAYLOAD_CLASS_NAME, payloadClassName); // FIXME(vc): wonder if this can be removed. Need to look into history. try (FSDataOutputStream outputStream = fs.create(propertyPath)) { + if (!isValidChecksum()) { + setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(props))); + } props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } } @@ -218,6 +233,10 @@ public class HoodieTableConfig extends HoodieConfig { "hoodie.properties file seems invalid. Please check for left over `.updated` files if any, manually copy it to hoodie.properties and retry"); } + private boolean isValidChecksum() { + return contains(TABLE_CHECKSUM) && validateChecksum(props); + } + /** * For serializing and de-serializing. */ @@ -227,13 +246,20 @@ public class HoodieTableConfig extends HoodieConfig { private void fetchConfigs(FileSystem fs, String metaPath) throws IOException { Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE); + Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP); try (FSDataInputStream is = fs.open(cfgPath)) { props.load(is); + // validate checksum for latest table version + if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) { + LOG.warn("Checksum validation failed. Falling back to backed up configs."); + try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) { + props.load(fsDataInputStream); + } + } } catch (IOException ioe) { if (!fs.exists(cfgPath)) { LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs."); // try the backup. this way no query ever fails if update fails midway. - Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP); try (FSDataInputStream is = fs.open(backupCfgPath)) { props.load(is); } @@ -284,15 +310,31 @@ public class HoodieTableConfig extends HoodieConfig { /// 2. delete the properties file, reads will go to the backup, until we are done. fs.delete(cfgPath, false); // 3. read current props, upsert and save back. + String checksum; try (FSDataInputStream in = fs.open(backupCfgPath); FSDataOutputStream out = fs.create(cfgPath, true)) { - Properties props = new Properties(); + Properties props = new TypedProperties(); props.load(in); modifyFn.accept(props, modifyProps); + if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) { + checksum = props.getProperty(TABLE_CHECKSUM.key()); + } else { + checksum = String.valueOf(generateChecksum(props)); + props.setProperty(TABLE_CHECKSUM.key(), checksum); + } props.store(out, "Updated at " + System.currentTimeMillis()); } // 4. verify and remove backup. - // FIXME(vc): generate a hash for verification. + try (FSDataInputStream in = fs.open(cfgPath)) { + Properties props = new TypedProperties(); + props.load(in); + if (!props.containsKey(TABLE_CHECKSUM.key()) || !props.getProperty(TABLE_CHECKSUM.key()).equals(checksum)) { + // delete the properties file and throw exception indicating update failure + // subsequent writes will recover and update, reads will go to the backup until then + fs.delete(cfgPath, false); + throw new HoodieIOException("Checksum property missing or does not match."); + } + } fs.delete(backupCfgPath, false); } catch (IOException e) { throw new HoodieIOException("Error updating table configs.", e); @@ -343,10 +385,28 @@ public class HoodieTableConfig extends HoodieConfig { if (hoodieConfig.contains(TIMELINE_TIMEZONE)) { HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE))); } + if (hoodieConfig.contains(TABLE_CHECKSUM)) { + hoodieConfig.setValue(TABLE_CHECKSUM, hoodieConfig.getString(TABLE_CHECKSUM)); + } else { + hoodieConfig.setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(hoodieConfig.getProps()))); + } hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } } + public static long generateChecksum(Properties props) { + if (!props.containsKey(NAME.key())) { + throw new IllegalArgumentException(NAME.key() + " property needs to be specified"); + } + String table = props.getProperty(NAME.key()); + String database = props.getProperty(DATABASE_NAME.key(), ""); + return BinaryUtil.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); + } + + public static boolean validateChecksum(Properties props) { + return Long.parseLong(props.getProperty(TABLE_CHECKSUM.key())) == generateChecksum(props); + } + /** * Read the table type from the table properties and if not found, return the default. */ @@ -505,6 +565,13 @@ public class HoodieTableConfig extends HoodieConfig { return getString(URL_ENCODE_PARTITIONING); } + /** + * Read the table checksum. + */ + private Long getTableChecksum() { + return getLong(TABLE_CHECKSUM); + } + public Map propsMap() { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java index 122c38775..3a249689a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java @@ -34,7 +34,9 @@ public enum HoodieTableVersion { // 0.9.0 onwards TWO(2), // 0.10.0 onwards - THREE(3); + THREE(3), + // 0.11.0 onwards + FOUR(4); private final int versionCode; @@ -47,7 +49,7 @@ public enum HoodieTableVersion { } public static HoodieTableVersion current() { - return THREE; + return FOUR; } public static HoodieTableVersion versionFromCode(int versionCode) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java index 0c7e89895..9fec2c8cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util; import java.nio.charset.Charset; +import java.util.zip.CRC32; public class BinaryUtil { @@ -187,5 +188,14 @@ public class BinaryUtil { } return temp; } + + /** + * Generate a checksum for a given set of bytes. + */ + public static long generateChecksum(byte[] data) { + CRC32 crc = new CRC32(); + crc.update(data); + return crc.getValue(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index b38eeba3f..9ded41543 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -33,9 +33,9 @@ import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.zip.CRC32; import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString; +import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; /** * A utility class supports spillable map. @@ -95,15 +95,6 @@ public class SpillableMapUtils { return outputStream.getSize(); } - /** - * Generate a checksum for a given set of bytes. - */ - public static long generateChecksum(byte[] data) { - CRC32 crc = new CRC32(); - crc.update(data); - return crc.getValue(); - } - /** * Compute a bytes representation of the payload by serializing the contents This is used to estimate the size of the * payload (either in memory or when written to disk). diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 289901df8..9fb0b20e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -56,6 +56,8 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; +import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; + /** * This class provides a disk spillable only map implementation. All of the data is currenly written to one file, * without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata. @@ -223,7 +225,7 @@ public final class BitCaskDiskMap org.awaitility awaitility - 3.1.2 test diff --git a/pom.xml b/pom.xml index 8ebf7a9ef..fa1ee3503 100644 --- a/pom.xml +++ b/pom.xml @@ -1119,6 +1119,12 @@ awaitility ${awaitility.version} test + + + org.objenesis + objenesis + +