diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index f86937d23..82688fecc 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -38,7 +38,8 @@ import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; -import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade; +import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter.Config; import org.apache.hudi.utilities.HoodieCleaner; @@ -453,7 +454,8 @@ public class SparkMain { .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build(); try { - new SparkUpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc)).run(metaClient, HoodieTableVersion.valueOf(toVersion), config, new HoodieSparkEngineContext(jsc), null); + new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.valueOf(toVersion), null); LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion)); return 0; } catch (Exception e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java deleted file mode 100644 index 0a74689c5..000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.common.config.ConfigProperty; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.HoodieTableVersion; -import org.apache.hudi.common.util.FileIOUtils; -import org.apache.hudi.config.HoodieWriteConfig; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -/** - * Helper class to assist in upgrading/downgrading Hoodie when there is a version change. - */ -public abstract class AbstractUpgradeDowngrade { - - private static final Logger LOG = LogManager.getLogger(AbstractUpgradeDowngrade.class); - public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated"; - - private HoodieTableMetaClient metaClient; - protected HoodieWriteConfig config; - protected HoodieEngineContext context; - private transient FileSystem fs; - private Path updatedPropsFilePath; - private Path propsFilePath; - - /** - * Perform Upgrade or Downgrade steps if required and updated table version if need be. - *

- * Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths. - * - * Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), and Hoodie version was upgraded to 0.6.0, - * Hoodie table version gets bumped to 1 and there are some upgrade steps need to be executed before doing any writes. - * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie table version 1 and then hoodie was downgraded - * to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need to be executed before proceeding w/ any writes. - * - * On a high level, these are the steps performed - * - * Step1 : Understand current hoodie table version and table version from hoodie.properties file - * Step2 : Delete any left over .updated from previous upgrade/downgrade - * Step3 : If version are different, perform upgrade/downgrade. - * Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated - * Step6 : Rename hoodie.properties.updated to hoodie.properties - *

- * - * @param metaClient instance of {@link HoodieTableMetaClient} to use - * @param toVersion version to which upgrade or downgrade has to be done. - * @param config instance of {@link HoodieWriteConfig} to use. - * @param context instance of {@link HoodieEngineContext} to use. - * @param instantTime current instant time that should not be touched. - */ - public abstract void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config, - HoodieEngineContext context, String instantTime); - - public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) { - HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion(); - // Ensure no inflight commits & versions are same - return toVersion.versionCode() != fromVersion.versionCode(); - } - - protected AbstractUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) { - this.metaClient = metaClient; - this.config = config; - this.context = context; - this.fs = metaClient.getFs(); - this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE); - this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); - } - - protected void run(HoodieTableVersion toVersion, String instantTime) throws IOException { - // Fetch version from property file and current version - HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion(); - if (!needsUpgradeOrDowngrade(toVersion)) { - return; - } - - if (fs.exists(updatedPropsFilePath)) { - // this can be left over .updated file from a failed attempt before. Many cases exist here. - // a) We failed while writing the .updated file and it's content is partial (e.g hdfs) - // b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway - // c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file - // All cases, it simply suffices to delete the file and proceed. - LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath))); - fs.delete(updatedPropsFilePath, false); - } - - // Perform the actual upgrade/downgrade; this has to be idempotent, for now. - LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion); - Map tableProps = new HashMap<>(); - if (fromVersion.versionCode() < toVersion.versionCode()) { - // upgrade - while (fromVersion.versionCode() < toVersion.versionCode()) { - HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1); - tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime)); - fromVersion = nextVersion; - } - } else { - // downgrade - while (fromVersion.versionCode() > toVersion.versionCode()) { - HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1); - tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime)); - fromVersion = prevVersion; - } - } - - // Write out the current version in hoodie.properties.updated file - for (Map.Entry entry: tableProps.entrySet()) { - metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue()); - } - metaClient.getTableConfig().setTableVersion(toVersion); - createUpdatedFile(metaClient.getTableConfig().getProps()); - - // because for different fs the fs.rename have different action,such as: - // a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false - // b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture - // c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException - // so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent - if (fs.exists(propsFilePath)) { - fs.delete(propsFilePath, false); - } - // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores. - // But as long as this does not leave a partial hoodie.properties file, we are okay. - fs.rename(updatedPropsFilePath, propsFilePath); - } - - private void createUpdatedFile(Properties props) throws IOException { - try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) { - props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); - } - } - - protected abstract Map upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); - - protected abstract Map downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); -} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseUpgradeDowngradeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseUpgradeDowngradeHelper.java new file mode 100644 index 000000000..d3f157be9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseUpgradeDowngradeHelper.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +/** + * Interface for engine-specific logic needed for upgrade and downgrade actions. + */ +public interface BaseUpgradeDowngradeHelper { + /** + * @param config Write config. + * @param context {@link HoodieEngineContext} instance to use. + * @return A new Hudi table for upgrade and downgrade actions. + */ + HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context); + + /** + * @param config Write config. + * @return partition columns in String. + */ + String getPartitionColumns(HoodieWriteConfig config); +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java index 7501ed5fa..24b9d6f5d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java @@ -32,10 +32,13 @@ public interface DowngradeHandler { /** * to be invoked to downgrade hoodie table from one version to a lower version. * - * @param config instance of {@link HoodieWriteConfig} to be used. - * @param context instance of {@link HoodieEngineContext} to be used. - * @param instantTime current instant time that should not touched. + * @param config instance of {@link HoodieWriteConfig} to be used. + * @param context instance of {@link HoodieEngineContext} to be used. + * @param instantTime current instant time that should not touched. + * @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used. * @return Map of config properties and its values to be added to table properties. */ - Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); + Map downgrade( + HoodieWriteConfig config, HoodieEngineContext context, String instantTime, + BaseUpgradeDowngradeHelper upgradeDowngradeHelper); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java similarity index 77% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java index e3a14e487..dddd5f4ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java @@ -27,16 +27,19 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import java.util.HashMap; import java.util.Map; -public abstract class BaseOneToTwoUpgradeHandler implements UpgradeHandler { +/** + * Upgrade handle to assist in upgrading hoodie table from version 1 to 2. + */ +public class OneToTwoUpgradeHandler implements UpgradeHandler { @Override - public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + public Map upgrade( + HoodieWriteConfig config, HoodieEngineContext context, String instantTime, + BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { Map tablePropsToAdd = new HashMap<>(); - tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, getPartitionColumns(config)); + tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, upgradeDowngradeHelper.getPartitionColumns(config)); tablePropsToAdd.put(HoodieTableConfig.RECORDKEY_FIELDS, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())); tablePropsToAdd.put(HoodieTableConfig.BASE_FILE_FORMAT, config.getString(HoodieTableConfig.BASE_FILE_FORMAT)); return tablePropsToAdd; } - - abstract String getPartitionColumns(HoodieWriteConfig config); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java similarity index 82% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java index 5997e1812..e6051cf32 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java @@ -32,12 +32,17 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler { +/** + * Downgrade handle to assist in downgrading hoodie table from version 1 to 0. + */ +public class OneToZeroDowngradeHandler implements DowngradeHandler { @Override - public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + public Map downgrade( + HoodieWriteConfig config, HoodieEngineContext context, String instantTime, + BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { + HoodieTable table = upgradeDowngradeHelper.getTable(config, context); // fetch pending commit info - HoodieTable table = getTable(config, context); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList()); for (HoodieInstant inflightInstant : commits) { @@ -47,6 +52,4 @@ public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler } return Collections.EMPTY_MAP; } - - abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java similarity index 75% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java index 9211144d0..964859c0a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.table.upgrade; @@ -32,7 +33,7 @@ import java.util.Map; public class ThreeToTwoDowngradeHandler implements DowngradeHandler { @Override - public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { if (config.isMetadataTableEnabled()) { // Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous // removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java similarity index 90% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java index 621711a3f..ee638a16f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java @@ -46,12 +46,16 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX; - -public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler { +/** + * Downgrade handler to assist in downgrading hoodie table from version 2 to 1. + */ +public class TwoToOneDowngradeHandler implements DowngradeHandler { @Override - public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { - HoodieTable table = getTable(config, context); + public Map downgrade( + HoodieWriteConfig config, HoodieEngineContext context, String instantTime, + BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { + HoodieTable table = upgradeDowngradeHelper.getTable(config, context); HoodieTableMetaClient metaClient = table.getMetaClient(); // re-create marker files if any partial timeline server based markers are found @@ -69,8 +73,6 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler { return Collections.EMPTY_MAP; } - abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context); - /** * Converts the markers in new format(timeline server based) to old format of direct markers, * i.e., one marker file per data file, without MARKERS.type file. @@ -106,8 +108,7 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler { // Deletes marker type file MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir); // Deletes timeline server based markers - deleteTimelineBasedMarkerFiles( - context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism()); + deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism); break; default: throw new HoodieException("The marker type \"" + markerTypeOption.get().name() @@ -116,8 +117,7 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler { } else { // In case of partial failures during downgrade, there is a chance that marker type file was deleted, // but timeline server based marker files are left. So deletes them if any - deleteTimelineBasedMarkerFiles( - context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism()); + deleteTimelineBasedMarkerFiles(context, markerDir, fileSystem, parallelism); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java similarity index 75% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java index 278e41384..6a825e159 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.table.upgrade; @@ -31,7 +32,7 @@ import java.util.Map; */ public class TwoToThreeUpgradeHandler implements UpgradeHandler { @Override - public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { if (config.isMetadataTableEnabled()) { // Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not // sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java new file mode 100644 index 000000000..c5ae043d1 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpgradeDowngradeException; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Helper class to assist in upgrading/downgrading Hoodie when there is a version change. + */ +public class UpgradeDowngrade { + + private static final Logger LOG = LogManager.getLogger(UpgradeDowngrade.class); + public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated"; + + private final BaseUpgradeDowngradeHelper upgradeDowngradeHelper; + private HoodieTableMetaClient metaClient; + protected HoodieWriteConfig config; + protected HoodieEngineContext context; + private transient FileSystem fs; + private Path updatedPropsFilePath; + private Path propsFilePath; + + public UpgradeDowngrade( + HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, + BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { + this.metaClient = metaClient; + this.config = config; + this.context = context; + this.fs = metaClient.getFs(); + this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE); + this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); + this.upgradeDowngradeHelper = upgradeDowngradeHelper; + } + + public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) { + HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion(); + // Ensure versions are same + return toVersion.versionCode() != fromVersion.versionCode(); + } + + /** + * Perform Upgrade or Downgrade steps if required and updated table version if need be. + *

+ * Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths. + *

+ * Essentially, if a dataset was created using an previous table version in an older release, + * and Hoodie version was upgraded to a new release with new table version supported, + * Hoodie table version gets bumped to the new version and there are some upgrade steps need + * to be executed before doing any writes. + *

+ * Similarly, if a dataset was created using an newer table version in an newer release, + * and then hoodie was downgraded to an older release or to older Hoodie table version, + * then some downgrade steps need to be executed before proceeding w/ any writes. + *

+ * Below shows the table version corresponding to the Hudi release: + * Hudi release -> table version + * pre 0.6.0 -> v0 + * 0.6.0 to 0.8.0 -> v1 + * 0.9.0 -> v2 + * 0.10.0 to current -> v3 + *

+ * On a high level, these are the steps performed + *

+ * Step1 : Understand current hoodie table version and table version from hoodie.properties file + * Step2 : Delete any left over .updated from previous upgrade/downgrade + * Step3 : If version are different, perform upgrade/downgrade. + * Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated + * Step6 : Rename hoodie.properties.updated to hoodie.properties + *

+ * + * @param toVersion version to which upgrade or downgrade has to be done. + * @param instantTime current instant time that should not be touched. + */ + public void run(HoodieTableVersion toVersion, String instantTime) { + try { + // Fetch version from property file and current version + HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion(); + if (!needsUpgradeOrDowngrade(toVersion)) { + return; + } + + if (fs.exists(updatedPropsFilePath)) { + // this can be left over .updated file from a failed attempt before. Many cases exist here. + // a) We failed while writing the .updated file and it's content is partial (e.g hdfs) + // b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway + // c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file + // All cases, it simply suffices to delete the file and proceed. + LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath))); + fs.delete(updatedPropsFilePath, false); + } + + // Perform the actual upgrade/downgrade; this has to be idempotent, for now. + LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion); + Map tableProps = new HashMap<>(); + if (fromVersion.versionCode() < toVersion.versionCode()) { + // upgrade + while (fromVersion.versionCode() < toVersion.versionCode()) { + HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1); + tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime)); + fromVersion = nextVersion; + } + } else { + // downgrade + while (fromVersion.versionCode() > toVersion.versionCode()) { + HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1); + tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime)); + fromVersion = prevVersion; + } + } + + // Write out the current version in hoodie.properties.updated file + for (Map.Entry entry : tableProps.entrySet()) { + metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue()); + } + metaClient.getTableConfig().setTableVersion(toVersion); + createUpdatedFile(metaClient.getTableConfig().getProps()); + + // because for different fs the fs.rename have different action,such as: + // a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false + // b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture + // c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException + // so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent + if (fs.exists(propsFilePath)) { + fs.delete(propsFilePath, false); + } + // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores. + // But as long as this does not leave a partial hoodie.properties file, we are okay. + fs.rename(updatedPropsFilePath, propsFilePath); + } catch (IOException e) { + throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e); + } + } + + private void createUpdatedFile(Properties props) throws IOException { + try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) { + props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); + } + } + + protected Map upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { + return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) { + return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) { + return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); + } else { + throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); + } + } + + protected Map downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { + return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) { + return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) { + return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); + } else { + throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java index 8ca6f0e86..9dc477ffc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java @@ -32,10 +32,13 @@ public interface UpgradeHandler { /** * to be invoked to upgrade hoodie table from one version to a higher version. * - * @param config instance of {@link HoodieWriteConfig} to be used. - * @param context instance of {@link HoodieEngineContext} to be used. - * @param instantTime current instant time that should not be touched. + * @param config instance of {@link HoodieWriteConfig} to be used. + * @param context instance of {@link HoodieEngineContext} to be used. + * @param instantTime current instant time that should not be touched. + * @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used. * @return Map of config properties and its values to be added to table properties. */ - Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); + Map upgrade( + HoodieWriteConfig config, HoodieEngineContext context, String instantTime, + BaseUpgradeDowngradeHelper upgradeDowngradeHelper); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java similarity index 79% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index f0e3e4f1e..18815b2e1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -33,6 +34,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.rollback.BaseRollbackHelper; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; @@ -46,13 +49,17 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler { +/** + * Upgrade handle to assist in upgrading hoodie table from version 0 to 1. + */ +public class ZeroToOneUpgradeHandler implements UpgradeHandler { @Override - public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + public Map upgrade( + HoodieWriteConfig config, HoodieEngineContext context, String instantTime, + BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { // fetch pending commit info - //HoodieSparkTable table = HoodieSparkTable.create(config, context); - HoodieTable table = getTable(config, context); + HoodieTable table = upgradeDowngradeHelper.getTable(config, context); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); @@ -67,8 +74,6 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler { return Collections.EMPTY_MAP; } - abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context); - /** * Recreate markers in new format. * Step1: Delete existing markers @@ -76,14 +81,14 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler { * Step3: recreate markers for all interested files. * * @param commitInstantTime instant of interest for which markers need to be recreated. - * @param table instance of {@link HoodieTable} to use - * @param context instance of {@link HoodieEngineContext} to use + * @param table instance of {@link HoodieTable} to use + * @param context instance of {@link HoodieEngineContext} to use * @throws HoodieRollbackException on any exception during upgrade. */ protected void recreateMarkers(final String commitInstantTime, - HoodieTable table, - HoodieEngineContext context, - int parallelism) throws HoodieRollbackException { + HoodieTable table, + HoodieEngineContext context, + int parallelism) throws HoodieRollbackException { try { // fetch hoodie instant Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() @@ -121,9 +126,13 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler { } } - abstract List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - HoodieEngineContext context, Option commitInstantOpt, - List rollbackRequests); + List getListBasedRollBackStats( + HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, + Option commitInstantOpt, List rollbackRequests) { + List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config) + .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests); + return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests); + } /** * Curates file name for marker from existing log file path. @@ -131,6 +140,7 @@ public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler { * marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND * * @param logFilePath log file path for which marker file name needs to be generated. + * @param table {@link HoodieTable} instance to use * @return the marker file name thus curated. */ private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java index aab248fc3..10c5ac5d6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java @@ -61,5 +61,5 @@ public class FlinkTaskContextSupplier extends TaskContextSupplier { // no operation for now return Option.empty(); } - + } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index e279940b6..669be164b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -58,7 +58,8 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; +import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.util.FlinkClientUtil; import com.codahale.metrics.Timer; @@ -383,7 +384,8 @@ public class HoodieFlinkWriteClient extends @Override protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); - new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); + new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.current(), instantTime); return getTableAndInitCtx(metaClient, operationType); } @@ -395,7 +397,8 @@ public class HoodieFlinkWriteClient extends */ public void upgradeDowngrade(String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); - new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); + new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.current(), instantTime); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java deleted file mode 100644 index 832db1d7d..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.common.config.ConfigProperty; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.HoodieTableVersion; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieUpgradeDowngradeException; - -import java.io.IOException; -import java.util.Map; - -public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade { - public FlinkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) { - super(metaClient, config, context); - } - - @Override - public void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config, - HoodieEngineContext context, String instantTime) { - try { - new FlinkUpgradeDowngrade(metaClient, config, context).run(toVersion, instantTime); - } catch (IOException e) { - throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e); - } - } - - @Override - protected Map upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { - if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { - return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) { - return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) { - return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime); - } else { - throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); - } - } - - @Override - protected Map downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { - if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { - return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) { - return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) { - return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime); - } else { - throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); - } - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java new file mode 100644 index 000000000..d097d2e60 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; + +/** + * Flink upgrade and downgrade helper. + */ +public class FlinkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper { + + private static final FlinkUpgradeDowngradeHelper SINGLETON_INSTANCE = + new FlinkUpgradeDowngradeHelper(); + + private FlinkUpgradeDowngradeHelper() { + } + + public static FlinkUpgradeDowngradeHelper getInstance() { + return SINGLETON_INSTANCE; + } + + @Override + public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + } + + @Override + public String getPartitionColumns(HoodieWriteConfig config) { + return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java deleted file mode 100644 index b84ce6dd5..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; - -public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler { - - @Override - String getPartitionColumns(HoodieWriteConfig config) { - return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java deleted file mode 100644 index 5d6e57e42..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.table.HoodieTable; - -/** - * Downgrade handle to assist in downgrading hoodie table from version 1 to 0. - */ -public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler { - - @Override - HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { - return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java deleted file mode 100644 index e6b3c3029..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ThreeToTwoDowngradeHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.common.config.ConfigProperty; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; - -import java.util.Collections; -import java.util.Map; - -/** - * Downgrade handler to assist in downgrading hoodie table from version 3 to 2. - */ -public class ThreeToTwoDowngradeHandler implements DowngradeHandler { - - @Override - public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { - if (config.isMetadataTableEnabled()) { - // Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous - // removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the - // table has been updated and is not forward compatible. Hence, we need to delete the table. - HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context); - } - return Collections.emptyMap(); - } -} \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java deleted file mode 100644 index ec8098aa6..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.table.HoodieTable; - -public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler { - @Override - HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { - return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - } -} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java deleted file mode 100644 index 9f5644aef..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.common.config.ConfigProperty; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; - -import java.util.Collections; -import java.util.Map; - -/** - * UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 2 to 3. - */ -public class TwoToThreeUpgradeHandler implements UpgradeHandler { - @Override - public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { - if (config.isMetadataTableEnabled()) { - // Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not - // sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the - // table has been updated and is not backward compatible. - HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context); - } - return Collections.emptyMap(); - } -} \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java deleted file mode 100644 index 284d3bcdf..000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.avro.model.HoodieRollbackRequest; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.rollback.BaseRollbackHelper; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; - -import java.util.List; - -/** - * Upgrade handle to assist in upgrading hoodie table from version 0 to 1. - */ -public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler { - - @Override - HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { - return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - } - - @Override - List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt, - List rollbackRequests) { - List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config) - .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests); - return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 1c5bdf5ec..dd9f43d16 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -53,8 +53,8 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.SparkCompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hudi.table.upgrade.AbstractUpgradeDowngrade; -import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade; +import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; @@ -414,20 +414,22 @@ public class SparkRDDWriteClient extends @Override protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); - AbstractUpgradeDowngrade upgradeDowngrade = new SparkUpgradeDowngrade(metaClient, config, context); + UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( + metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { this.txnManager.beginTransaction(); try { // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER)); - new SparkUpgradeDowngrade(metaClient, config, context) - .run(metaClient, HoodieTableVersion.current(), config, context, instantTime); + new UpgradeDowngrade( + metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.current(), instantTime); } finally { this.txnManager.endTransaction(); } } else { - upgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, context, instantTime); + upgradeDowngrade.run(HoodieTableVersion.current(), instantTime); } } metaClient.validateTableProperties(config.getProps(), operationType); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java deleted file mode 100644 index 7fb286eb1..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.HoodieSparkUtils; -import org.apache.hudi.config.HoodieWriteConfig; - -/** - * Upgrade handle to assist in upgrading hoodie table from version 1 to 2. - */ -public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler { - - @Override - String getPartitionColumns(HoodieWriteConfig config) { - return HoodieSparkUtils.getPartitionColumns(config.getProps()); - } -} \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java deleted file mode 100644 index 2e6064a40..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; - -/** - * Downgrade handle to assist in downgrading hoodie table from version 1 to 0. - */ -public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler { - - @Override - HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { - return HoodieSparkTable.create(config, context); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java deleted file mode 100644 index 83f29b544..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.common.config.ConfigProperty; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.HoodieTableVersion; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieUpgradeDowngradeException; - -import java.io.IOException; -import java.util.Map; - -public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade { - - public SparkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) { - super(metaClient, config, context); - } - - @Override - public void run(HoodieTableMetaClient metaClient, - HoodieTableVersion toVersion, - HoodieWriteConfig config, - HoodieEngineContext context, - String instantTime) { - try { - new SparkUpgradeDowngrade(metaClient, config, context).run(toVersion, instantTime); - } catch (IOException e) { - throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e); - } - } - - @Override - protected Map upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { - if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { - return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) { - return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.THREE) { - return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime); - } else { - throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); - } - } - - @Override - protected Map downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { - if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { - return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) { - return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime); - } else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.TWO) { - return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime); - } else { - throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); - } - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java new file mode 100644 index 000000000..f943b7017 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; + +/** + * Spark upgrade and downgrade helper. + */ +public class SparkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper { + + private static final SparkUpgradeDowngradeHelper SINGLETON_INSTANCE = + new SparkUpgradeDowngradeHelper(); + + private SparkUpgradeDowngradeHelper() { + } + + public static SparkUpgradeDowngradeHelper getInstance() { + return SINGLETON_INSTANCE; + } + + @Override + public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieSparkTable.create(config, context); + } + + @Override + public String getPartitionColumns(HoodieWriteConfig config) { + return HoodieSparkUtils.getPartitionColumns(config.getProps()); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java deleted file mode 100644 index 055d33047..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; - -/** - * Downgrade handle to assist in downgrading hoodie table from version 2 to 1. - */ -public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler { - - @Override - HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { - return HoodieSparkTable.create(config, context); - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java deleted file mode 100644 index 2cfb39c9b..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.upgrade; - -import org.apache.hudi.avro.model.HoodieRollbackRequest; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.rollback.BaseRollbackHelper; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; - -import java.util.List; - -/** - * Upgrade handle to assist in upgrading hoodie table from version 0 to 1. - */ -public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler { - - @Override - HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { - return HoodieSparkTable.create(config, context); - } - - @Override - List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt, - List rollbackRequests) { - List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config) - .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests); - return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests); - } -} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f7cb22cda..468444bec 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -71,7 +71,8 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade; +import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.hadoop.fs.FSDataOutputStream; @@ -885,7 +886,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); // Test downgrade by running the downgrader - new SparkUpgradeDowngrade(metaClient, writeConfig, context).run(metaClient, HoodieTableVersion.TWO, writeConfig, context, null); + new UpgradeDowngrade(metaClient, writeConfig, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.TWO, null); assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.TWO.versionCode()); assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 792da4e08..19ec4e6d0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -85,7 +85,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Unit tests {@link SparkUpgradeDowngrade}. + * Unit tests {@link UpgradeDowngrade}. */ public class TestUpgradeDowngrade extends HoodieClientTestBase { @@ -177,7 +177,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { } // should re-create marker files for 2nd commit since its pending. - new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.ONE, cfg, context, null); + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.ONE, null); // assert marker files assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices); @@ -218,7 +219,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { downgradeTableConfigsFromTwoToOne(cfg); // perform upgrade - new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.TWO, cfg, context, null); + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.TWO, null); // verify hoodie.table.version got upgraded metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) @@ -321,7 +323,8 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { } // downgrade should be performed. all marker files should be deleted - new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, toVersion, cfg, context, null); + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(toVersion, null); // assert marker files assertMarkerFilesForDowngrade(table, commitInstant, toVersion == HoodieTableVersion.ONE); @@ -557,7 +560,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { private void createResidualFile() throws IOException { Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); - Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); + Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); // Step1: Copy hoodie.properties to hoodie.properties.orig FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile,