[HUDI-2795] Add mechanism to safely update,delete and recover table properties (#4038)
* [HUDI-2795] Add mechanism to safely update,delete and recover table properties - Fail safe mechanism, that lets queries succeed off a backup file - Readers who are not upgraded to this version of code will just fail until recovery is done. - Added unit tests that exercises all these scenarios. - Adding CLI for recovery, updation to table command. - [Pending] Add some hash based verfication to ensure any rare partial writes for HDFS * Fixing upgrade/downgrade infrastructure to use new updation method
This commit is contained in:
@@ -156,7 +156,7 @@ public class RepairsCommand implements CommandMarker {
|
|||||||
newProps.load(new FileInputStream(new File(overwriteFilePath)));
|
newProps.load(new FileInputStream(new File(overwriteFilePath)));
|
||||||
Map<String, String> oldProps = client.getTableConfig().propsMap();
|
Map<String, String> oldProps = client.getTableConfig().propsMap();
|
||||||
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
|
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
|
||||||
HoodieTableConfig.createHoodieProperties(client.getFs(), metaPathDir, newProps);
|
HoodieTableConfig.create(client.getFs(), metaPathDir, newProps);
|
||||||
|
|
||||||
TreeSet<String> allPropKeys = new TreeSet<>();
|
TreeSet<String> allPropKeys = new TreeSet<>();
|
||||||
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
|
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
|
||||||
|
|||||||
@@ -20,13 +20,16 @@ package org.apache.hudi.cli.commands;
|
|||||||
|
|
||||||
import org.apache.hudi.cli.HoodieCLI;
|
import org.apache.hudi.cli.HoodieCLI;
|
||||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||||
|
import org.apache.hudi.cli.HoodieTableHeaderFields;
|
||||||
import org.apache.hudi.cli.TableHeader;
|
import org.apache.hudi.cli.TableHeader;
|
||||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.exception.TableNotFoundException;
|
import org.apache.hudi.exception.TableNotFoundException;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.springframework.shell.core.CommandMarker;
|
import org.springframework.shell.core.CommandMarker;
|
||||||
@@ -35,12 +38,21 @@ import org.springframework.shell.core.annotation.CliOption;
|
|||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CLI command to display hudi table options.
|
* CLI command to display hudi table options.
|
||||||
@@ -170,6 +182,67 @@ public class TableCommand implements CommandMarker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@CliCommand(value = "table recover-configs", help = "Recover table configs, from update/delete that failed midway.")
|
||||||
|
public String recoverTableConfig() throws IOException {
|
||||||
|
HoodieCLI.refreshTableMetadata();
|
||||||
|
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
|
||||||
|
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
|
||||||
|
HoodieTableConfig.recover(client.getFs(), metaPathDir);
|
||||||
|
return descTable();
|
||||||
|
}
|
||||||
|
|
||||||
|
@CliCommand(value = "table update-configs", help = "Update the table configs with configs with provided file.")
|
||||||
|
public String updateTableConfig(
|
||||||
|
@CliOption(key = {"props-file"}, mandatory = true, help = "Path to a properties file on local filesystem")
|
||||||
|
final String updatePropsFilePath) throws IOException {
|
||||||
|
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
|
||||||
|
Map<String, String> oldProps = client.getTableConfig().propsMap();
|
||||||
|
|
||||||
|
Properties updatedProps = new Properties();
|
||||||
|
updatedProps.load(new FileInputStream(updatePropsFilePath));
|
||||||
|
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
|
||||||
|
HoodieTableConfig.update(client.getFs(), metaPathDir, updatedProps);
|
||||||
|
|
||||||
|
HoodieCLI.refreshTableMetadata();
|
||||||
|
Map<String, String> newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
|
||||||
|
return renderOldNewProps(newProps, oldProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@CliCommand(value = "table delete-configs", help = "Delete the supplied table configs from the table.")
|
||||||
|
public String deleteTableConfig(
|
||||||
|
@CliOption(key = {"comma-separated-configs"}, mandatory = true, help = "Comma separated list of configs to delete.")
|
||||||
|
final String csConfigs) {
|
||||||
|
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
|
||||||
|
Map<String, String> oldProps = client.getTableConfig().propsMap();
|
||||||
|
|
||||||
|
Set<String> deleteConfigs = Arrays.stream(csConfigs.split(",")).collect(Collectors.toSet());
|
||||||
|
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
|
||||||
|
HoodieTableConfig.delete(client.getFs(), metaPathDir, deleteConfigs);
|
||||||
|
|
||||||
|
HoodieCLI.refreshTableMetadata();
|
||||||
|
Map<String, String> newProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
|
||||||
|
return renderOldNewProps(newProps, oldProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String renderOldNewProps(Map<String, String> newProps, Map<String, String> oldProps) {
|
||||||
|
TreeSet<String> allPropKeys = new TreeSet<>();
|
||||||
|
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
|
||||||
|
allPropKeys.addAll(oldProps.keySet());
|
||||||
|
|
||||||
|
String[][] rows = new String[allPropKeys.size()][];
|
||||||
|
int ind = 0;
|
||||||
|
for (String propKey : allPropKeys) {
|
||||||
|
String[] row = new String[]{
|
||||||
|
propKey,
|
||||||
|
oldProps.getOrDefault(propKey, "null"),
|
||||||
|
newProps.getOrDefault(propKey, "null")
|
||||||
|
};
|
||||||
|
rows[ind++] = row;
|
||||||
|
}
|
||||||
|
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
|
||||||
|
HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use Streams when you are dealing with raw data.
|
* Use Streams when you are dealing with raw data.
|
||||||
* @param filePath output file path.
|
* @param filePath output file path.
|
||||||
|
|||||||
@@ -23,21 +23,16 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
|
|||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||||
import org.apache.hudi.common.util.FileIOUtils;
|
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
|
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
|
* Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
|
||||||
@@ -107,69 +102,38 @@ public class UpgradeDowngrade {
|
|||||||
* @param instantTime current instant time that should not be touched.
|
* @param instantTime current instant time that should not be touched.
|
||||||
*/
|
*/
|
||||||
public void run(HoodieTableVersion toVersion, String instantTime) {
|
public void run(HoodieTableVersion toVersion, String instantTime) {
|
||||||
try {
|
// Fetch version from property file and current version
|
||||||
// Fetch version from property file and current version
|
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
|
||||||
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
|
if (!needsUpgradeOrDowngrade(toVersion)) {
|
||||||
if (!needsUpgradeOrDowngrade(toVersion)) {
|
return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fs.exists(updatedPropsFilePath)) {
|
|
||||||
// this can be left over .updated file from a failed attempt before. Many cases exist here.
|
|
||||||
// a) We failed while writing the .updated file and it's content is partial (e.g hdfs)
|
|
||||||
// b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway
|
|
||||||
// c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file
|
|
||||||
// All cases, it simply suffices to delete the file and proceed.
|
|
||||||
LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
|
|
||||||
fs.delete(updatedPropsFilePath, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
|
|
||||||
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
|
|
||||||
Map<ConfigProperty, String> tableProps = new HashMap<>();
|
|
||||||
if (fromVersion.versionCode() < toVersion.versionCode()) {
|
|
||||||
// upgrade
|
|
||||||
while (fromVersion.versionCode() < toVersion.versionCode()) {
|
|
||||||
HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
|
|
||||||
tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
|
|
||||||
fromVersion = nextVersion;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// downgrade
|
|
||||||
while (fromVersion.versionCode() > toVersion.versionCode()) {
|
|
||||||
HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
|
|
||||||
tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
|
|
||||||
fromVersion = prevVersion;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write out the current version in hoodie.properties.updated file
|
|
||||||
for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
|
|
||||||
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
metaClient.getTableConfig().setTableVersion(toVersion);
|
|
||||||
createUpdatedFile(metaClient.getTableConfig().getProps());
|
|
||||||
|
|
||||||
// because for different fs the fs.rename have different action,such as:
|
|
||||||
// a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
|
|
||||||
// b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
|
|
||||||
// c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
|
|
||||||
// so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
|
|
||||||
if (fs.exists(propsFilePath)) {
|
|
||||||
fs.delete(propsFilePath, false);
|
|
||||||
}
|
|
||||||
// Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
|
|
||||||
// But as long as this does not leave a partial hoodie.properties file, we are okay.
|
|
||||||
fs.rename(updatedPropsFilePath, propsFilePath);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private void createUpdatedFile(Properties props) throws IOException {
|
// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
|
||||||
try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
|
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
|
||||||
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
Map<ConfigProperty, String> tableProps = new HashMap<>();
|
||||||
|
if (fromVersion.versionCode() < toVersion.versionCode()) {
|
||||||
|
// upgrade
|
||||||
|
while (fromVersion.versionCode() < toVersion.versionCode()) {
|
||||||
|
HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
|
||||||
|
tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
|
||||||
|
fromVersion = nextVersion;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// downgrade
|
||||||
|
while (fromVersion.versionCode() > toVersion.versionCode()) {
|
||||||
|
HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
|
||||||
|
tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
|
||||||
|
fromVersion = prevVersion;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write out the current version in hoodie.properties.updated file
|
||||||
|
for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
|
||||||
|
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
metaClient.getTableConfig().setTableVersion(toVersion);
|
||||||
|
|
||||||
|
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
|||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
||||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||||
|
import org.apache.hudi.common.util.FileIOUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
@@ -47,6 +48,8 @@ import java.util.Arrays;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -69,6 +72,7 @@ public class HoodieTableConfig extends HoodieConfig {
|
|||||||
private static final Logger LOG = LogManager.getLogger(HoodieTableConfig.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieTableConfig.class);
|
||||||
|
|
||||||
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
|
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
|
||||||
|
public static final String HOODIE_PROPERTIES_FILE_BACKUP = "hoodie.properties.backup";
|
||||||
|
|
||||||
public static final ConfigProperty<String> NAME = ConfigProperty
|
public static final ConfigProperty<String> NAME = ConfigProperty
|
||||||
.key("hoodie.table.name")
|
.key("hoodie.table.name")
|
||||||
@@ -173,12 +177,11 @@ public class HoodieTableConfig extends HoodieConfig {
|
|||||||
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
|
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
|
||||||
LOG.info("Loading table properties from " + propertyPath);
|
LOG.info("Loading table properties from " + propertyPath);
|
||||||
try {
|
try {
|
||||||
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
|
fetchConfigs(fs, metaPath);
|
||||||
props.load(inputStream);
|
|
||||||
}
|
|
||||||
if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null
|
if (contains(PAYLOAD_CLASS_NAME) && payloadClassName != null
|
||||||
&& !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) {
|
&& !getString(PAYLOAD_CLASS_NAME).equals(payloadClassName)) {
|
||||||
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
|
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
|
||||||
|
// FIXME(vc): wonder if this can be removed. Need to look into history.
|
||||||
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
|
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
|
||||||
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
||||||
}
|
}
|
||||||
@@ -192,16 +195,103 @@ public class HoodieTableConfig extends HoodieConfig {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* For serializing and de-serializing.
|
* For serializing and de-serializing.
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public HoodieTableConfig() {
|
public HoodieTableConfig() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
|
||||||
|
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
|
||||||
|
try (FSDataInputStream is = fs.open(cfgPath)) {
|
||||||
|
props.load(is);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
if (!fs.exists(cfgPath)) {
|
||||||
|
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
|
||||||
|
// try the backup. this way no query ever fails if update fails midway.
|
||||||
|
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
|
||||||
|
try (FSDataInputStream is = fs.open(backupCfgPath)) {
|
||||||
|
props.load(is);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void recover(FileSystem fs, Path metadataFolder) throws IOException {
|
||||||
|
Path cfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE);
|
||||||
|
Path backupCfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE_BACKUP);
|
||||||
|
recoverIfNeeded(fs, cfgPath, backupCfgPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void recoverIfNeeded(FileSystem fs, Path cfgPath, Path backupCfgPath) throws IOException {
|
||||||
|
if (!fs.exists(cfgPath)) {
|
||||||
|
// copy over from backup
|
||||||
|
try (FSDataInputStream in = fs.open(backupCfgPath);
|
||||||
|
FSDataOutputStream out = fs.create(cfgPath, false)) {
|
||||||
|
FileIOUtils.copy(in, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// regardless, we don't need the backup anymore.
|
||||||
|
fs.delete(backupCfgPath, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void upsertProperties(Properties current, Properties updated) {
|
||||||
|
updated.forEach((k, v) -> current.setProperty(k.toString(), v.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void deleteProperties(Properties current, Properties deleted) {
|
||||||
|
deleted.forEach((k, v) -> current.remove(k.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void modify(FileSystem fs, Path metadataFolder, Properties modifyProps, BiConsumer<Properties, Properties> modifyFn) {
|
||||||
|
Path cfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE);
|
||||||
|
Path backupCfgPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE_BACKUP);
|
||||||
|
try {
|
||||||
|
// 0. do any recovery from prior attempts.
|
||||||
|
recoverIfNeeded(fs, cfgPath, backupCfgPath);
|
||||||
|
|
||||||
|
// 1. backup the existing properties.
|
||||||
|
try (FSDataInputStream in = fs.open(cfgPath);
|
||||||
|
FSDataOutputStream out = fs.create(backupCfgPath, false)) {
|
||||||
|
FileIOUtils.copy(in, out);
|
||||||
|
}
|
||||||
|
/// 2. delete the properties file, reads will go to the backup, until we are done.
|
||||||
|
fs.delete(cfgPath, false);
|
||||||
|
// 3. read current props, upsert and save back.
|
||||||
|
try (FSDataInputStream in = fs.open(backupCfgPath);
|
||||||
|
FSDataOutputStream out = fs.create(cfgPath, true)) {
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.load(in);
|
||||||
|
modifyFn.accept(props, modifyProps);
|
||||||
|
props.store(out, "Updated at " + System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
// 4. verify and remove backup.
|
||||||
|
// FIXME(vc): generate a hash for verification.
|
||||||
|
fs.delete(backupCfgPath, false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Error updating table configs.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Upserts the table config with the set of properties passed in. We implement a fail-safe backup protocol
|
||||||
|
* here for safely updating with recovery and also ensuring the table config continues to be readable.
|
||||||
|
*/
|
||||||
|
public static void update(FileSystem fs, Path metadataFolder, Properties updatedProps) {
|
||||||
|
modify(fs, metadataFolder, updatedProps, HoodieTableConfig::upsertProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void delete(FileSystem fs, Path metadataFolder, Set<String> deletedProps) {
|
||||||
|
Properties props = new Properties();
|
||||||
|
deletedProps.forEach(p -> props.setProperty(p, ""));
|
||||||
|
modify(fs, metadataFolder, props, HoodieTableConfig::deleteProperties);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the hoodie meta directory and any necessary files inside the meta (including the hoodie.properties).
|
* Initialize the hoodie meta directory and any necessary files inside the meta (including the hoodie.properties).
|
||||||
*/
|
*/
|
||||||
public static void createHoodieProperties(FileSystem fs, Path metadataFolder, Properties properties)
|
public static void create(FileSystem fs, Path metadataFolder, Properties properties)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (!fs.exists(metadataFolder)) {
|
if (!fs.exists(metadataFolder)) {
|
||||||
fs.mkdirs(metadataFolder);
|
fs.mkdirs(metadataFolder);
|
||||||
|
|||||||
@@ -377,7 +377,7 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
|
initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
|
||||||
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
|
HoodieTableConfig.create(fs, metaPathDir, props);
|
||||||
// We should not use fs.getConf as this might be different from the original configuration
|
// We should not use fs.getConf as this might be different from the original configuration
|
||||||
// used to create the fs in unit tests
|
// used to create the fs in unit tests
|
||||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
|
|||||||
props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), "false");
|
props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), "false");
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.putAll(props);
|
properties.putAll(props);
|
||||||
HoodieTableConfig.createHoodieProperties(metaClient.getFs(), new Path(metaClient.getMetaPath()), properties);
|
HoodieTableConfig.create(metaClient.getFs(), new Path(metaClient.getMetaPath()), properties);
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
|
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
|
||||||
BootstrapIndex bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
|
BootstrapIndex bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
|
||||||
|
|||||||
@@ -0,0 +1,137 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common.table;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||||
|
import org.apache.hudi.common.util.CollectionUtils;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class TestHoodieTableConfig extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path metaPath;
|
||||||
|
private Path cfgPath;
|
||||||
|
private Path backupCfgPath;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
initPath();
|
||||||
|
fs = new Path(basePath).getFileSystem(new Configuration());
|
||||||
|
metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.setProperty(HoodieTableConfig.NAME.key(), "test-table");
|
||||||
|
HoodieTableConfig.create(fs, metaPath, props);
|
||||||
|
cfgPath = new Path(metaPath, HoodieTableConfig.HOODIE_PROPERTIES_FILE);
|
||||||
|
backupCfgPath = new Path(metaPath, HoodieTableConfig.HOODIE_PROPERTIES_FILE_BACKUP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreate() throws IOException {
|
||||||
|
assertTrue(fs.exists(new Path(metaPath, HoodieTableConfig.HOODIE_PROPERTIES_FILE)));
|
||||||
|
HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
assertEquals(4, config.getProps().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdate() throws IOException {
|
||||||
|
Properties updatedProps = new Properties();
|
||||||
|
updatedProps.setProperty(HoodieTableConfig.NAME.key(), "test-table2");
|
||||||
|
updatedProps.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key(), "new_field");
|
||||||
|
HoodieTableConfig.update(fs, metaPath, updatedProps);
|
||||||
|
|
||||||
|
assertTrue(fs.exists(cfgPath));
|
||||||
|
assertFalse(fs.exists(backupCfgPath));
|
||||||
|
HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
assertEquals(5, config.getProps().size());
|
||||||
|
assertEquals("test-table2", config.getTableName());
|
||||||
|
assertEquals("new_field", config.getPreCombineField());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelete() throws IOException {
|
||||||
|
Set<String> deletedProps = CollectionUtils.createSet(HoodieTableConfig.ARCHIVELOG_FOLDER.key(), "hoodie.invalid.config");
|
||||||
|
HoodieTableConfig.delete(fs, metaPath, deletedProps);
|
||||||
|
|
||||||
|
assertTrue(fs.exists(cfgPath));
|
||||||
|
assertFalse(fs.exists(backupCfgPath));
|
||||||
|
HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
assertEquals(3, config.getProps().size());
|
||||||
|
assertNull(config.getProps().getProperty("hoodie.invalid.config"));
|
||||||
|
assertFalse(config.getProps().contains(HoodieTableConfig.ARCHIVELOG_FOLDER.key()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadsWhenPropsFileDoesNotExist() throws IOException {
|
||||||
|
fs.delete(cfgPath, false);
|
||||||
|
assertThrows(HoodieIOException.class, () -> {
|
||||||
|
new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadsWithUpdateFailures() throws IOException {
|
||||||
|
HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
fs.delete(cfgPath, false);
|
||||||
|
try (FSDataOutputStream out = fs.create(backupCfgPath)) {
|
||||||
|
config.getProps().store(out, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(fs.exists(cfgPath));
|
||||||
|
assertTrue(fs.exists(backupCfgPath));
|
||||||
|
config = new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
assertEquals(4, config.getProps().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = {true, false})
|
||||||
|
public void testUpdateRecovery(boolean shouldPropsFileExist) throws IOException {
|
||||||
|
HoodieTableConfig config = new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
if (!shouldPropsFileExist) {
|
||||||
|
fs.delete(cfgPath, false);
|
||||||
|
}
|
||||||
|
try (FSDataOutputStream out = fs.create(backupCfgPath)) {
|
||||||
|
config.getProps().store(out, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
HoodieTableConfig.recoverIfNeeded(fs, cfgPath, backupCfgPath);
|
||||||
|
assertTrue(fs.exists(cfgPath));
|
||||||
|
assertFalse(fs.exists(backupCfgPath));
|
||||||
|
config = new HoodieTableConfig(fs, metaPath.toString(), null);
|
||||||
|
assertEquals(4, config.getProps().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user