1
0

[HUDI-2809] Introduce a checksum mechanism for validating hoodie.properties (#4712)

Fix dependency conflict

Fix repairs command

Implement putIfAbsent for DDB lock provider

Add upgrade step and validate while fetching configs

Validate checksum for latest table version only while fetching config

Move generateChecksum to BinaryUtil

Rebase and resolve conflict

Fix table version check
This commit is contained in:
Sagar Sumit
2022-02-18 10:17:06 +05:30
committed by GitHub
parent 2844a77b43
commit ed106f671e
20 changed files with 320 additions and 40 deletions

View File

@@ -20,9 +20,14 @@ package org.apache.hudi.common.config;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -30,6 +35,8 @@ import java.util.stream.Collectors;
*/
public class TypedProperties extends Properties implements Serializable {
private final HashSet<Object> keys = new LinkedHashSet<>();
public TypedProperties() {
super(null);
}
@@ -42,6 +49,45 @@ public class TypedProperties extends Properties implements Serializable {
}
}
@Override
public Enumeration propertyNames() {
return Collections.enumeration(keys);
}
@Override
public synchronized Enumeration<Object> keys() {
return Collections.enumeration(keys);
}
@Override
public Set<String> stringPropertyNames() {
Set<String> set = new LinkedHashSet<>();
for (Object key : this.keys) {
set.add((String) key);
}
return set;
}
@Override
public synchronized Object put(Object key, Object value) {
keys.remove(key);
keys.add(key);
return super.put(key, value);
}
public synchronized Object putIfAbsent(Object key, Object value) {
if (!containsKey(String.valueOf(key))) {
keys.add(key);
}
return super.putIfAbsent(key, value);
}
@Override
public Object remove(Object key) {
keys.remove(key);
return super.remove(key);
}
private void checkKey(String property) {
if (!containsKey(property)) {
throw new IllegalArgumentException("Property " + property + " not found");

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -38,6 +39,7 @@ import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -58,6 +60,8 @@ import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during
* initializing a path as hoodie base path and never changes during the lifetime of a hoodie table.
@@ -197,6 +201,14 @@ public class HoodieTableConfig extends HoodieConfig {
public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName();
public static final ConfigProperty<String> TABLE_CHECKSUM = ConfigProperty
.key("hoodie.table.checksum")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Table checksum is used to guard against partial writes in HDFS. It is added as the last entry in hoodie.properties and then used to validate while reading table config.");
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
super();
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
@@ -208,6 +220,9 @@ public class HoodieTableConfig extends HoodieConfig {
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
// FIXME(vc): wonder if this can be removed. Need to look into history.
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
if (!isValidChecksum()) {
setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(props)));
}
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
}
}
@@ -218,6 +233,10 @@ public class HoodieTableConfig extends HoodieConfig {
"hoodie.properties file seems invalid. Please check for left over `.updated` files if any, manually copy it to hoodie.properties and retry");
}
private boolean isValidChecksum() {
return contains(TABLE_CHECKSUM) && validateChecksum(props);
}
/**
* For serializing and de-serializing.
*/
@@ -227,13 +246,20 @@ public class HoodieTableConfig extends HoodieConfig {
private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
try (FSDataInputStream is = fs.open(cfgPath)) {
props.load(is);
// validate checksum for latest table version
if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) {
LOG.warn("Checksum validation failed. Falling back to backed up configs.");
try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) {
props.load(fsDataInputStream);
}
}
} catch (IOException ioe) {
if (!fs.exists(cfgPath)) {
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
// try the backup. this way no query ever fails if update fails midway.
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
try (FSDataInputStream is = fs.open(backupCfgPath)) {
props.load(is);
}
@@ -284,15 +310,31 @@ public class HoodieTableConfig extends HoodieConfig {
/// 2. delete the properties file, reads will go to the backup, until we are done.
fs.delete(cfgPath, false);
// 3. read current props, upsert and save back.
String checksum;
try (FSDataInputStream in = fs.open(backupCfgPath);
FSDataOutputStream out = fs.create(cfgPath, true)) {
Properties props = new Properties();
Properties props = new TypedProperties();
props.load(in);
modifyFn.accept(props, modifyProps);
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
checksum = props.getProperty(TABLE_CHECKSUM.key());
} else {
checksum = String.valueOf(generateChecksum(props));
props.setProperty(TABLE_CHECKSUM.key(), checksum);
}
props.store(out, "Updated at " + System.currentTimeMillis());
}
// 4. verify and remove backup.
// FIXME(vc): generate a hash for verification.
try (FSDataInputStream in = fs.open(cfgPath)) {
Properties props = new TypedProperties();
props.load(in);
if (!props.containsKey(TABLE_CHECKSUM.key()) || !props.getProperty(TABLE_CHECKSUM.key()).equals(checksum)) {
// delete the properties file and throw exception indicating update failure
// subsequent writes will recover and update, reads will go to the backup until then
fs.delete(cfgPath, false);
throw new HoodieIOException("Checksum property missing or does not match.");
}
}
fs.delete(backupCfgPath, false);
} catch (IOException e) {
throw new HoodieIOException("Error updating table configs.", e);
@@ -343,10 +385,28 @@ public class HoodieTableConfig extends HoodieConfig {
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
}
if (hoodieConfig.contains(TABLE_CHECKSUM)) {
hoodieConfig.setValue(TABLE_CHECKSUM, hoodieConfig.getString(TABLE_CHECKSUM));
} else {
hoodieConfig.setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(hoodieConfig.getProps())));
}
hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
}
}
public static long generateChecksum(Properties props) {
if (!props.containsKey(NAME.key())) {
throw new IllegalArgumentException(NAME.key() + " property needs to be specified");
}
String table = props.getProperty(NAME.key());
String database = props.getProperty(DATABASE_NAME.key(), "");
return BinaryUtil.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8));
}
public static boolean validateChecksum(Properties props) {
return Long.parseLong(props.getProperty(TABLE_CHECKSUM.key())) == generateChecksum(props);
}
/**
* Read the table type from the table properties and if not found, return the default.
*/
@@ -505,6 +565,13 @@ public class HoodieTableConfig extends HoodieConfig {
return getString(URL_ENCODE_PARTITIONING);
}
/**
* Read the table checksum.
*/
private Long getTableChecksum() {
return getLong(TABLE_CHECKSUM);
}
public Map<String, String> propsMap() {
return props.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));

View File

@@ -34,7 +34,9 @@ public enum HoodieTableVersion {
// 0.9.0 onwards
TWO(2),
// 0.10.0 onwards
THREE(3);
THREE(3),
// 0.11.0 onwards
FOUR(4);
private final int versionCode;
@@ -47,7 +49,7 @@ public enum HoodieTableVersion {
}
public static HoodieTableVersion current() {
return THREE;
return FOUR;
}
public static HoodieTableVersion versionFromCode(int versionCode) {

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
import java.nio.charset.Charset;
import java.util.zip.CRC32;
public class BinaryUtil {
@@ -187,5 +188,14 @@ public class BinaryUtil {
}
return temp;
}
/**
* Generate a checksum for a given set of bytes.
*/
public static long generateChecksum(byte[] data) {
CRC32 crc = new CRC32();
crc.update(data);
return crc.getValue();
}
}

View File

@@ -33,9 +33,9 @@ import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.zip.CRC32;
import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString;
import static org.apache.hudi.common.util.BinaryUtil.generateChecksum;
/**
* A utility class supports spillable map.
@@ -95,15 +95,6 @@ public class SpillableMapUtils {
return outputStream.getSize();
}
/**
* Generate a checksum for a given set of bytes.
*/
public static long generateChecksum(byte[] data) {
CRC32 crc = new CRC32();
crc.update(data);
return crc.getValue();
}
/**
* Compute a bytes representation of the payload by serializing the contents This is used to estimate the size of the
* payload (either in memory or when written to disk).

View File

@@ -56,6 +56,8 @@ import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import static org.apache.hudi.common.util.BinaryUtil.generateChecksum;
/**
* This class provides a disk spillable only map implementation. All of the data is currenly written to one file,
* without any rollover support. It uses the following : 1) An in-memory map that tracks the key-> latest ValueMetadata.
@@ -223,7 +225,7 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
new BitCaskDiskMap.ValueMetadata(this.filePath, valueSize, filePosition.get(), timestamp));
byte[] serializedKey = SerializationUtils.serialize(key);
filePosition
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val),
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(generateChecksum(val),
serializedKey.length, valueSize, serializedKey, val, timestamp)));
if (flush) {
flushToDisk();