[HUDI-2513] Refactor table upgrade and downgrade actions in hudi-client module (#3743)
This commit is contained in:
@@ -38,7 +38,8 @@ import org.apache.hudi.exception.HoodieSavepointException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorType;
|
||||
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
|
||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
|
||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||
import org.apache.hudi.utilities.HDFSParquetImporter;
|
||||
import org.apache.hudi.utilities.HDFSParquetImporter.Config;
|
||||
import org.apache.hudi.utilities.HoodieCleaner;
|
||||
@@ -453,7 +454,8 @@ public class SparkMain {
|
||||
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
|
||||
try {
|
||||
new SparkUpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc)).run(metaClient, HoodieTableVersion.valueOf(toVersion), config, new HoodieSparkEngineContext(jsc), null);
|
||||
new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
|
||||
.run(HoodieTableVersion.valueOf(toVersion), null);
|
||||
LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion));
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -1,164 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
|
||||
*/
|
||||
public abstract class AbstractUpgradeDowngrade {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AbstractUpgradeDowngrade.class);
|
||||
public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated";
|
||||
|
||||
private HoodieTableMetaClient metaClient;
|
||||
protected HoodieWriteConfig config;
|
||||
protected HoodieEngineContext context;
|
||||
private transient FileSystem fs;
|
||||
private Path updatedPropsFilePath;
|
||||
private Path propsFilePath;
|
||||
|
||||
/**
|
||||
* Perform Upgrade or Downgrade steps if required and updated table version if need be.
|
||||
* <p>
|
||||
* Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths.
|
||||
*
|
||||
* Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), and Hoodie version was upgraded to 0.6.0,
|
||||
* Hoodie table version gets bumped to 1 and there are some upgrade steps need to be executed before doing any writes.
|
||||
* Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie table version 1 and then hoodie was downgraded
|
||||
* to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need to be executed before proceeding w/ any writes.
|
||||
*
|
||||
* On a high level, these are the steps performed
|
||||
*
|
||||
* Step1 : Understand current hoodie table version and table version from hoodie.properties file
|
||||
* Step2 : Delete any left over .updated from previous upgrade/downgrade
|
||||
* Step3 : If version are different, perform upgrade/downgrade.
|
||||
* Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated
|
||||
* Step6 : Rename hoodie.properties.updated to hoodie.properties
|
||||
* </p>
|
||||
*
|
||||
* @param metaClient instance of {@link HoodieTableMetaClient} to use
|
||||
* @param toVersion version to which upgrade or downgrade has to be done.
|
||||
* @param config instance of {@link HoodieWriteConfig} to use.
|
||||
* @param context instance of {@link HoodieEngineContext} to use.
|
||||
* @param instantTime current instant time that should not be touched.
|
||||
*/
|
||||
public abstract void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config,
|
||||
HoodieEngineContext context, String instantTime);
|
||||
|
||||
public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
|
||||
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
|
||||
// Ensure no inflight commits & versions are same
|
||||
return toVersion.versionCode() != fromVersion.versionCode();
|
||||
}
|
||||
|
||||
protected AbstractUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
this.metaClient = metaClient;
|
||||
this.config = config;
|
||||
this.context = context;
|
||||
this.fs = metaClient.getFs();
|
||||
this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE);
|
||||
this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
|
||||
}
|
||||
|
||||
protected void run(HoodieTableVersion toVersion, String instantTime) throws IOException {
|
||||
// Fetch version from property file and current version
|
||||
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
|
||||
if (!needsUpgradeOrDowngrade(toVersion)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (fs.exists(updatedPropsFilePath)) {
|
||||
// this can be left over .updated file from a failed attempt before. Many cases exist here.
|
||||
// a) We failed while writing the .updated file and it's content is partial (e.g hdfs)
|
||||
// b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway
|
||||
// c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file
|
||||
// All cases, it simply suffices to delete the file and proceed.
|
||||
LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
|
||||
fs.delete(updatedPropsFilePath, false);
|
||||
}
|
||||
|
||||
// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
|
||||
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
|
||||
Map<ConfigProperty, String> tableProps = new HashMap<>();
|
||||
if (fromVersion.versionCode() < toVersion.versionCode()) {
|
||||
// upgrade
|
||||
while (fromVersion.versionCode() < toVersion.versionCode()) {
|
||||
HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
|
||||
tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
|
||||
fromVersion = nextVersion;
|
||||
}
|
||||
} else {
|
||||
// downgrade
|
||||
while (fromVersion.versionCode() > toVersion.versionCode()) {
|
||||
HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
|
||||
tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
|
||||
fromVersion = prevVersion;
|
||||
}
|
||||
}
|
||||
|
||||
// Write out the current version in hoodie.properties.updated file
|
||||
for (Map.Entry<ConfigProperty, String> entry: tableProps.entrySet()) {
|
||||
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
|
||||
}
|
||||
metaClient.getTableConfig().setTableVersion(toVersion);
|
||||
createUpdatedFile(metaClient.getTableConfig().getProps());
|
||||
|
||||
// because for different fs the fs.rename have different action,such as:
|
||||
// a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
|
||||
// b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
|
||||
// c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
|
||||
// so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
|
||||
if (fs.exists(propsFilePath)) {
|
||||
fs.delete(propsFilePath, false);
|
||||
}
|
||||
// Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
|
||||
// But as long as this does not leave a partial hoodie.properties file, we are okay.
|
||||
fs.rename(updatedPropsFilePath, propsFilePath);
|
||||
}
|
||||
|
||||
private void createUpdatedFile(Properties props) throws IOException {
|
||||
try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
|
||||
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
|
||||
|
||||
protected abstract Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime);
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
/**
|
||||
* Interface for engine-specific logic needed for upgrade and downgrade actions.
|
||||
*/
|
||||
public interface BaseUpgradeDowngradeHelper {
|
||||
/**
|
||||
* @param config Write config.
|
||||
* @param context {@link HoodieEngineContext} instance to use.
|
||||
* @return A new Hudi table for upgrade and downgrade actions.
|
||||
*/
|
||||
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
|
||||
|
||||
/**
|
||||
* @param config Write config.
|
||||
* @return partition columns in String.
|
||||
*/
|
||||
String getPartitionColumns(HoodieWriteConfig config);
|
||||
}
|
||||
@@ -32,10 +32,13 @@ public interface DowngradeHandler {
|
||||
/**
|
||||
* to be invoked to downgrade hoodie table from one version to a lower version.
|
||||
*
|
||||
* @param config instance of {@link HoodieWriteConfig} to be used.
|
||||
* @param context instance of {@link HoodieEngineContext} to be used.
|
||||
* @param instantTime current instant time that should not touched.
|
||||
* @param config instance of {@link HoodieWriteConfig} to be used.
|
||||
* @param context instance of {@link HoodieEngineContext} to be used.
|
||||
* @param instantTime current instant time that should not touched.
|
||||
* @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used.
|
||||
* @return Map of config properties and its values to be added to table properties.
|
||||
*/
|
||||
Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime);
|
||||
Map<ConfigProperty, String> downgrade(
|
||||
HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
|
||||
BaseUpgradeDowngradeHelper upgradeDowngradeHelper);
|
||||
}
|
||||
|
||||
@@ -27,16 +27,19 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class BaseOneToTwoUpgradeHandler implements UpgradeHandler {
|
||||
/**
|
||||
* Upgrade handle to assist in upgrading hoodie table from version 1 to 2.
|
||||
*/
|
||||
public class OneToTwoUpgradeHandler implements UpgradeHandler {
|
||||
|
||||
@Override
|
||||
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
public Map<ConfigProperty, String> upgrade(
|
||||
HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
|
||||
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
|
||||
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
|
||||
tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, getPartitionColumns(config));
|
||||
tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, upgradeDowngradeHelper.getPartitionColumns(config));
|
||||
tablePropsToAdd.put(HoodieTableConfig.RECORDKEY_FIELDS, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
|
||||
tablePropsToAdd.put(HoodieTableConfig.BASE_FILE_FORMAT, config.getString(HoodieTableConfig.BASE_FILE_FORMAT));
|
||||
return tablePropsToAdd;
|
||||
}
|
||||
|
||||
abstract String getPartitionColumns(HoodieWriteConfig config);
|
||||
}
|
||||
@@ -32,12 +32,17 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler {
|
||||
/**
|
||||
* Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
|
||||
*/
|
||||
public class OneToZeroDowngradeHandler implements DowngradeHandler {
|
||||
|
||||
@Override
|
||||
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
public Map<ConfigProperty, String> downgrade(
|
||||
HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
|
||||
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
|
||||
HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
|
||||
// fetch pending commit info
|
||||
HoodieTable table = getTable(config, context);
|
||||
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
|
||||
List<HoodieInstant> commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList());
|
||||
for (HoodieInstant inflightInstant : commits) {
|
||||
@@ -47,6 +52,4 @@ public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler
|
||||
}
|
||||
return Collections.EMPTY_MAP;
|
||||
}
|
||||
|
||||
abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
|
||||
}
|
||||
@@ -7,13 +7,14 @@
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
@@ -32,7 +33,7 @@ import java.util.Map;
|
||||
public class ThreeToTwoDowngradeHandler implements DowngradeHandler {
|
||||
|
||||
@Override
|
||||
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
|
||||
if (config.isMetadataTableEnabled()) {
|
||||
// Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous
|
||||
// removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the
|
||||
@@ -46,12 +46,16 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
|
||||
|
||||
public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
|
||||
/**
|
||||
* Downgrade handler to assist in downgrading hoodie table from version 2 to 1.
|
||||
*/
|
||||
public class TwoToOneDowngradeHandler implements DowngradeHandler {
|
||||
|
||||
@Override
|
||||
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
HoodieTable table = getTable(config, context);
|
||||
public Map<ConfigProperty, String> downgrade(
|
||||
HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
|
||||
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
|
||||
HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
|
||||
HoodieTableMetaClient metaClient = table.getMetaClient();
|
||||
|
||||
// re-create marker files if any partial timeline server based markers are found
|
||||
@@ -69,8 +73,6 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
|
||||
return Collections.EMPTY_MAP;
|
||||
}
|
||||
|
||||
abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
|
||||
|
||||
/**
|
||||
* Converts the markers in new format(timeline server based) to old format of direct markers,
|
||||
* i.e., one marker file per data file, without MARKERS.type file.
|
||||
@@ -106,8 +108,7 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
|
||||
// Deletes marker type file
|
||||
MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir);
|
||||
// Deletes timeline server based markers
|
||||
deleteTimelineBasedMarkerFiles(
|
||||
context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
|
||||
deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism);
|
||||
break;
|
||||
default:
|
||||
throw new HoodieException("The marker type \"" + markerTypeOption.get().name()
|
||||
@@ -116,8 +117,7 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
|
||||
} else {
|
||||
// In case of partial failures during downgrade, there is a chance that marker type file was deleted,
|
||||
// but timeline server based marker files are left. So deletes them if any
|
||||
deleteTimelineBasedMarkerFiles(
|
||||
context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
|
||||
deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,13 +7,14 @@
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
@@ -31,7 +32,7 @@ import java.util.Map;
|
||||
*/
|
||||
public class TwoToThreeUpgradeHandler implements UpgradeHandler {
|
||||
@Override
|
||||
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
|
||||
if (config.isMetadataTableEnabled()) {
|
||||
// Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not
|
||||
// sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the
|
||||
@@ -0,0 +1,198 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Helper class to assist in upgrading/downgrading Hoodie when there is a version change.
|
||||
*/
|
||||
public class UpgradeDowngrade {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(UpgradeDowngrade.class);
|
||||
public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated";
|
||||
|
||||
private final BaseUpgradeDowngradeHelper upgradeDowngradeHelper;
|
||||
private HoodieTableMetaClient metaClient;
|
||||
protected HoodieWriteConfig config;
|
||||
protected HoodieEngineContext context;
|
||||
private transient FileSystem fs;
|
||||
private Path updatedPropsFilePath;
|
||||
private Path propsFilePath;
|
||||
|
||||
public UpgradeDowngrade(
|
||||
HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
|
||||
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
|
||||
this.metaClient = metaClient;
|
||||
this.config = config;
|
||||
this.context = context;
|
||||
this.fs = metaClient.getFs();
|
||||
this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE);
|
||||
this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
|
||||
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
|
||||
}
|
||||
|
||||
public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
|
||||
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
|
||||
// Ensure versions are same
|
||||
return toVersion.versionCode() != fromVersion.versionCode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform Upgrade or Downgrade steps if required and updated table version if need be.
|
||||
* <p>
|
||||
* Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths.
|
||||
* <p>
|
||||
* Essentially, if a dataset was created using an previous table version in an older release,
|
||||
* and Hoodie version was upgraded to a new release with new table version supported,
|
||||
* Hoodie table version gets bumped to the new version and there are some upgrade steps need
|
||||
* to be executed before doing any writes.
|
||||
* <p>
|
||||
* Similarly, if a dataset was created using an newer table version in an newer release,
|
||||
* and then hoodie was downgraded to an older release or to older Hoodie table version,
|
||||
* then some downgrade steps need to be executed before proceeding w/ any writes.
|
||||
* <p>
|
||||
* Below shows the table version corresponding to the Hudi release:
|
||||
* Hudi release -> table version
|
||||
* pre 0.6.0 -> v0
|
||||
* 0.6.0 to 0.8.0 -> v1
|
||||
* 0.9.0 -> v2
|
||||
* 0.10.0 to current -> v3
|
||||
* <p>
|
||||
* On a high level, these are the steps performed
|
||||
* <p>
|
||||
* Step1 : Understand current hoodie table version and table version from hoodie.properties file
|
||||
* Step2 : Delete any left over .updated from previous upgrade/downgrade
|
||||
* Step3 : If version are different, perform upgrade/downgrade.
|
||||
* Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated
|
||||
* Step6 : Rename hoodie.properties.updated to hoodie.properties
|
||||
* </p>
|
||||
*
|
||||
* @param toVersion version to which upgrade or downgrade has to be done.
|
||||
* @param instantTime current instant time that should not be touched.
|
||||
*/
|
||||
public void run(HoodieTableVersion toVersion, String instantTime) {
|
||||
try {
|
||||
// Fetch version from property file and current version
|
||||
HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion();
|
||||
if (!needsUpgradeOrDowngrade(toVersion)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (fs.exists(updatedPropsFilePath)) {
|
||||
// this can be left over .updated file from a failed attempt before. Many cases exist here.
|
||||
// a) We failed while writing the .updated file and it's content is partial (e.g hdfs)
|
||||
// b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway
|
||||
// c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file
|
||||
// All cases, it simply suffices to delete the file and proceed.
|
||||
LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
|
||||
fs.delete(updatedPropsFilePath, false);
|
||||
}
|
||||
|
||||
// Perform the actual upgrade/downgrade; this has to be idempotent, for now.
|
||||
LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion);
|
||||
Map<ConfigProperty, String> tableProps = new HashMap<>();
|
||||
if (fromVersion.versionCode() < toVersion.versionCode()) {
|
||||
// upgrade
|
||||
while (fromVersion.versionCode() < toVersion.versionCode()) {
|
||||
HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
|
||||
tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
|
||||
fromVersion = nextVersion;
|
||||
}
|
||||
} else {
|
||||
// downgrade
|
||||
while (fromVersion.versionCode() > toVersion.versionCode()) {
|
||||
HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
|
||||
tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
|
||||
fromVersion = prevVersion;
|
||||
}
|
||||
}
|
||||
|
||||
// Write out the current version in hoodie.properties.updated file
|
||||
for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
|
||||
metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
|
||||
}
|
||||
metaClient.getTableConfig().setTableVersion(toVersion);
|
||||
createUpdatedFile(metaClient.getTableConfig().getProps());
|
||||
|
||||
// because for different fs the fs.rename have different action,such as:
|
||||
// a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
|
||||
// b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
|
||||
// c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
|
||||
// so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
|
||||
if (fs.exists(propsFilePath)) {
|
||||
fs.delete(propsFilePath, false);
|
||||
}
|
||||
// Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
|
||||
// But as long as this does not leave a partial hoodie.properties file, we are okay.
|
||||
fs.rename(updatedPropsFilePath, propsFilePath);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void createUpdatedFile(Properties props) throws IOException {
|
||||
try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
|
||||
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
||||
if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
|
||||
return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
|
||||
} else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
|
||||
return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
|
||||
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {
|
||||
return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
|
||||
} else {
|
||||
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
||||
if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
|
||||
return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
|
||||
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
|
||||
return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
|
||||
} else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) {
|
||||
return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
|
||||
} else {
|
||||
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -32,10 +32,13 @@ public interface UpgradeHandler {
|
||||
/**
|
||||
* to be invoked to upgrade hoodie table from one version to a higher version.
|
||||
*
|
||||
* @param config instance of {@link HoodieWriteConfig} to be used.
|
||||
* @param context instance of {@link HoodieEngineContext} to be used.
|
||||
* @param instantTime current instant time that should not be touched.
|
||||
* @param config instance of {@link HoodieWriteConfig} to be used.
|
||||
* @param context instance of {@link HoodieEngineContext} to be used.
|
||||
* @param instantTime current instant time that should not be touched.
|
||||
* @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used.
|
||||
* @return Map of config properties and its values to be added to table properties.
|
||||
*/
|
||||
Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime);
|
||||
Map<ConfigProperty, String> upgrade(
|
||||
HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
|
||||
BaseUpgradeDowngradeHelper upgradeDowngradeHelper);
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRollbackRequest;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
@@ -33,6 +34,8 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
|
||||
import org.apache.hudi.table.action.rollback.RollbackUtils;
|
||||
import org.apache.hudi.table.marker.WriteMarkers;
|
||||
@@ -46,13 +49,17 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
|
||||
/**
|
||||
* Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
|
||||
*/
|
||||
public class ZeroToOneUpgradeHandler implements UpgradeHandler {
|
||||
|
||||
@Override
|
||||
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
public Map<ConfigProperty, String> upgrade(
|
||||
HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
|
||||
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) {
|
||||
// fetch pending commit info
|
||||
//HoodieSparkTable table = HoodieSparkTable.create(config, context);
|
||||
HoodieTable table = getTable(config, context);
|
||||
HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
|
||||
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
|
||||
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toList());
|
||||
@@ -67,8 +74,6 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
|
||||
return Collections.EMPTY_MAP;
|
||||
}
|
||||
|
||||
abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context);
|
||||
|
||||
/**
|
||||
* Recreate markers in new format.
|
||||
* Step1: Delete existing markers
|
||||
@@ -76,14 +81,14 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
|
||||
* Step3: recreate markers for all interested files.
|
||||
*
|
||||
* @param commitInstantTime instant of interest for which markers need to be recreated.
|
||||
* @param table instance of {@link HoodieTable} to use
|
||||
* @param context instance of {@link HoodieEngineContext} to use
|
||||
* @param table instance of {@link HoodieTable} to use
|
||||
* @param context instance of {@link HoodieEngineContext} to use
|
||||
* @throws HoodieRollbackException on any exception during upgrade.
|
||||
*/
|
||||
protected void recreateMarkers(final String commitInstantTime,
|
||||
HoodieTable table,
|
||||
HoodieEngineContext context,
|
||||
int parallelism) throws HoodieRollbackException {
|
||||
HoodieTable table,
|
||||
HoodieEngineContext context,
|
||||
int parallelism) throws HoodieRollbackException {
|
||||
try {
|
||||
// fetch hoodie instant
|
||||
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
|
||||
@@ -121,9 +126,13 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
|
||||
}
|
||||
}
|
||||
|
||||
abstract List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
|
||||
HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
|
||||
List<ListingBasedRollbackRequest> rollbackRequests);
|
||||
List<HoodieRollbackStat> getListBasedRollBackStats(
|
||||
HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
|
||||
Option<HoodieInstant> commitInstantOpt, List<ListingBasedRollbackRequest> rollbackRequests) {
|
||||
List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
|
||||
.getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
|
||||
return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
|
||||
}
|
||||
|
||||
/**
|
||||
* Curates file name for marker from existing log file path.
|
||||
@@ -131,6 +140,7 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler {
|
||||
* marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND
|
||||
*
|
||||
* @param logFilePath log file path for which marker file name needs to be generated.
|
||||
* @param table {@link HoodieTable} instance to use
|
||||
* @return the marker file name thus curated.
|
||||
*/
|
||||
private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) {
|
||||
@@ -61,5 +61,5 @@ public class FlinkTaskContextSupplier extends TaskContextSupplier {
|
||||
// no operation for now
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -58,7 +58,8 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
|
||||
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
|
||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||
import org.apache.hudi.util.FlinkClientUtil;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
@@ -383,7 +384,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
@Override
|
||||
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
|
||||
HoodieTableMetaClient metaClient = createMetaClient(true);
|
||||
new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
|
||||
new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
|
||||
.run(HoodieTableVersion.current(), instantTime);
|
||||
return getTableAndInitCtx(metaClient, operationType);
|
||||
}
|
||||
|
||||
@@ -395,7 +397,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
*/
|
||||
public void upgradeDowngrade(String instantTime) {
|
||||
HoodieTableMetaClient metaClient = createMetaClient(true);
|
||||
new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
|
||||
new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
|
||||
.run(HoodieTableVersion.current(), instantTime);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade {
|
||||
public FlinkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
super(metaClient, config, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config,
|
||||
HoodieEngineContext context, String instantTime) {
|
||||
try {
|
||||
new FlinkUpgradeDowngrade(metaClient, config, context).run(toVersion, instantTime);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
||||
if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
|
||||
return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
|
||||
return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {
|
||||
return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime);
|
||||
} else {
|
||||
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
||||
if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
|
||||
return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
|
||||
return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) {
|
||||
return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime);
|
||||
} else {
|
||||
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
/**
|
||||
* Flink upgrade and downgrade helper.
|
||||
*/
|
||||
public class FlinkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper {
|
||||
|
||||
private static final FlinkUpgradeDowngradeHelper SINGLETON_INSTANCE =
|
||||
new FlinkUpgradeDowngradeHelper();
|
||||
|
||||
private FlinkUpgradeDowngradeHelper() {
|
||||
}
|
||||
|
||||
public static FlinkUpgradeDowngradeHelper getInstance() {
|
||||
return SINGLETON_INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPartitionColumns(HoodieWriteConfig config) {
|
||||
return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||
|
||||
public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler {
|
||||
|
||||
@Override
|
||||
String getPartitionColumns(HoodieWriteConfig config) {
|
||||
return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
|
||||
}
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
/**
|
||||
* Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
|
||||
*/
|
||||
public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler {
|
||||
|
||||
@Override
|
||||
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
|
||||
}
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Downgrade handler to assist in downgrading hoodie table from version 3 to 2.
|
||||
*/
|
||||
public class ThreeToTwoDowngradeHandler implements DowngradeHandler {
|
||||
|
||||
@Override
|
||||
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
if (config.isMetadataTableEnabled()) {
|
||||
// Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous
|
||||
// removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the
|
||||
// table has been updated and is not forward compatible. Hence, we need to delete the table.
|
||||
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler {
|
||||
@Override
|
||||
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
|
||||
}
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 2 to 3.
|
||||
*/
|
||||
public class TwoToThreeUpgradeHandler implements UpgradeHandler {
|
||||
@Override
|
||||
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) {
|
||||
if (config.isMetadataTableEnabled()) {
|
||||
// Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not
|
||||
// sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the
|
||||
// table has been updated and is not backward compatible.
|
||||
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
}
|
||||
@@ -1,54 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRollbackRequest;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
|
||||
*/
|
||||
public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
|
||||
|
||||
@Override
|
||||
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
|
||||
}
|
||||
|
||||
@Override
|
||||
List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
|
||||
List<ListingBasedRollbackRequest> rollbackRequests) {
|
||||
List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
|
||||
.getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
|
||||
return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
|
||||
}
|
||||
}
|
||||
@@ -53,8 +53,8 @@ import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.action.compact.SparkCompactHelpers;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
import org.apache.hudi.table.upgrade.AbstractUpgradeDowngrade;
|
||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
|
||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
|
||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@@ -414,20 +414,22 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
@Override
|
||||
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
|
||||
HoodieTableMetaClient metaClient = createMetaClient(true);
|
||||
AbstractUpgradeDowngrade upgradeDowngrade = new SparkUpgradeDowngrade(metaClient, config, context);
|
||||
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
|
||||
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
|
||||
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
|
||||
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
|
||||
this.txnManager.beginTransaction();
|
||||
try {
|
||||
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
|
||||
this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER));
|
||||
new SparkUpgradeDowngrade(metaClient, config, context)
|
||||
.run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
|
||||
new UpgradeDowngrade(
|
||||
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
|
||||
.run(HoodieTableVersion.current(), instantTime);
|
||||
} finally {
|
||||
this.txnManager.endTransaction();
|
||||
}
|
||||
} else {
|
||||
upgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
|
||||
upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
|
||||
}
|
||||
}
|
||||
metaClient.validateTableProperties(config.getProps(), operationType);
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.HoodieSparkUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
/**
|
||||
* Upgrade handle to assist in upgrading hoodie table from version 1 to 2.
|
||||
*/
|
||||
public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler {
|
||||
|
||||
@Override
|
||||
String getPartitionColumns(HoodieWriteConfig config) {
|
||||
return HoodieSparkUtils.getPartitionColumns(config.getProps());
|
||||
}
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
/**
|
||||
* Downgrade handle to assist in downgrading hoodie table from version 1 to 0.
|
||||
*/
|
||||
public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler {
|
||||
|
||||
@Override
|
||||
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieSparkTable.create(config, context);
|
||||
}
|
||||
}
|
||||
@@ -1,75 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade {
|
||||
|
||||
public SparkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
super(metaClient, config, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(HoodieTableMetaClient metaClient,
|
||||
HoodieTableVersion toVersion,
|
||||
HoodieWriteConfig config,
|
||||
HoodieEngineContext context,
|
||||
String instantTime) {
|
||||
try {
|
||||
new SparkUpgradeDowngrade(metaClient, config, context).run(toVersion, instantTime);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
||||
if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) {
|
||||
return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) {
|
||||
return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) {
|
||||
return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime);
|
||||
} else {
|
||||
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) {
|
||||
if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) {
|
||||
return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) {
|
||||
return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime);
|
||||
} else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) {
|
||||
return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime);
|
||||
} else {
|
||||
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.HoodieSparkUtils;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
/**
|
||||
* Spark upgrade and downgrade helper.
|
||||
*/
|
||||
public class SparkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper {
|
||||
|
||||
private static final SparkUpgradeDowngradeHelper SINGLETON_INSTANCE =
|
||||
new SparkUpgradeDowngradeHelper();
|
||||
|
||||
private SparkUpgradeDowngradeHelper() {
|
||||
}
|
||||
|
||||
public static SparkUpgradeDowngradeHelper getInstance() {
|
||||
return SINGLETON_INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieSparkTable.create(config, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPartitionColumns(HoodieWriteConfig config) {
|
||||
return HoodieSparkUtils.getPartitionColumns(config.getProps());
|
||||
}
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
/**
|
||||
* Downgrade handle to assist in downgrading hoodie table from version 2 to 1.
|
||||
*/
|
||||
public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler {
|
||||
|
||||
@Override
|
||||
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieSparkTable.create(config, context);
|
||||
}
|
||||
}
|
||||
@@ -1,53 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.upgrade;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRollbackRequest;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Upgrade handle to assist in upgrading hoodie table from version 0 to 1.
|
||||
*/
|
||||
public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
|
||||
|
||||
@Override
|
||||
HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return HoodieSparkTable.create(config, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt,
|
||||
List<ListingBasedRollbackRequest> rollbackRequests) {
|
||||
List<HoodieRollbackRequest> hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
|
||||
.getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
|
||||
return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
|
||||
}
|
||||
}
|
||||
@@ -71,7 +71,8 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
|
||||
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
|
||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
@@ -885,7 +886,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
||||
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
|
||||
|
||||
// Test downgrade by running the downgrader
|
||||
new SparkUpgradeDowngrade(metaClient, writeConfig, context).run(metaClient, HoodieTableVersion.TWO, writeConfig, context, null);
|
||||
new UpgradeDowngrade(metaClient, writeConfig, context, SparkUpgradeDowngradeHelper.getInstance())
|
||||
.run(HoodieTableVersion.TWO, null);
|
||||
|
||||
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.TWO.versionCode());
|
||||
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
|
||||
|
||||
@@ -85,7 +85,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Unit tests {@link SparkUpgradeDowngrade}.
|
||||
* Unit tests {@link UpgradeDowngrade}.
|
||||
*/
|
||||
public class TestUpgradeDowngrade extends HoodieClientTestBase {
|
||||
|
||||
@@ -177,7 +177,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
|
||||
}
|
||||
|
||||
// should re-create marker files for 2nd commit since its pending.
|
||||
new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.ONE, cfg, context, null);
|
||||
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
|
||||
.run(HoodieTableVersion.ONE, null);
|
||||
|
||||
// assert marker files
|
||||
assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
|
||||
@@ -218,7 +219,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
|
||||
downgradeTableConfigsFromTwoToOne(cfg);
|
||||
|
||||
// perform upgrade
|
||||
new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.TWO, cfg, context, null);
|
||||
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
|
||||
.run(HoodieTableVersion.TWO, null);
|
||||
|
||||
// verify hoodie.table.version got upgraded
|
||||
metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath())
|
||||
@@ -321,7 +323,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
|
||||
}
|
||||
|
||||
// downgrade should be performed. all marker files should be deleted
|
||||
new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, toVersion, cfg, context, null);
|
||||
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
|
||||
.run(toVersion, null);
|
||||
|
||||
// assert marker files
|
||||
assertMarkerFilesForDowngrade(table, commitInstant, toVersion == HoodieTableVersion.ONE);
|
||||
@@ -557,7 +560,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
|
||||
|
||||
private void createResidualFile() throws IOException {
|
||||
Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE);
|
||||
Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
|
||||
Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE);
|
||||
|
||||
// Step1: Copy hoodie.properties to hoodie.properties.orig
|
||||
FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile,
|
||||
|
||||
Reference in New Issue
Block a user