From 23dca6c237517bc9d9407556af6b1042f09ae76b Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 14 Aug 2021 17:20:23 -0700 Subject: [PATCH] [HUDI-2268] Add upgrade and downgrade to and from 0.9.0 (#3470) - Added upgrade and downgrade step to and from 0.9.0. Upgrade adds few table properties. Downgrade recreates timeline server based marker files if any. --- .../commands/UpgradeOrDowngradeCommand.java | 24 +- .../hudi/table/marker/DirectWriteMarkers.java | 15 +- .../upgrade/AbstractUpgradeDowngrade.java | 23 +- .../upgrade/BaseOneToTwoUpgradeHandler.java | 42 ++++ .../BaseOneToZeroDowngradeHandler.java | 52 +++++ .../upgrade/BaseTwoToOneDowngradeHandler.java | 142 ++++++++++++ .../upgrade/BaseZeroToOneUpgradeHandler.java | 144 ++++++++++++ .../hudi/table/upgrade/DowngradeHandler.java | 6 +- .../hudi/table/upgrade/UpgradeHandler.java | 6 +- .../table/upgrade/FlinkUpgradeDowngrade.java | 14 +- .../table/upgrade/OneToTwoUpgradeHandler.java | 30 +++ .../upgrade/OneToZeroDowngradeHandler.java | 22 +- .../upgrade/TwoToOneDowngradeHandler.java | 32 +++ .../upgrade/ZeroToOneUpgradeHandler.java | 107 +-------- .../table/upgrade/OneToTwoUpgradeHandler.java | 33 +++ .../upgrade/OneToZeroDowngradeHandler.java | 22 +- .../table/upgrade/SparkUpgradeDowngrade.java | 15 +- .../upgrade/TwoToOneDowngradeHandler.java | 35 +++ .../upgrade/ZeroToOneUpgradeHandler.java | 107 +-------- .../org/apache/hudi/HoodieSparkUtils.scala | 38 +++- .../table/upgrade/TestUpgradeDowngrade.java | 209 +++++++++++++++--- .../hudi/common/table/HoodieTableVersion.java | 8 +- .../apache/hudi/common/util/MarkerUtils.java | 11 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 10 +- .../org/apache/hudi/HoodieWriterUtils.scala | 22 +- .../handlers/marker/MarkerDirState.java | 1 - .../utilities/deltastreamer/DeltaSync.java | 7 +- 27 files changed, 851 insertions(+), 326 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java index deb9e0727..a5e513c61 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java @@ -28,15 +28,17 @@ import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; /** - * CLI command to assist in upgrading/downgrading Hoodie dataset to a different version. + * CLI command to assist in upgrading/downgrading Hoodie table to a different version. */ +@Component public class UpgradeOrDowngradeCommand implements CommandMarker { - @CliCommand(value = "upgrade hoodie dataset ", help = "Upgrades hoodie dataset") - public String upgradeHoodieDataset( - @CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, + @CliCommand(value = "upgrade table", help = "Upgrades a table") + public String upgradeHoodieTable( + @CliOption(key = {"toVersion"}, help = "To version of Hoodie table to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", @@ -52,14 +54,14 @@ public class UpgradeOrDowngradeCommand implements CommandMarker { int exitCode = process.waitFor(); HoodieCLI.refreshTableMetadata(); if (exitCode != 0) { - return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); + return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion); } - return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); + return String.format("Hoodie table upgraded/downgraded to ", toVersion); } - @CliCommand(value = "downgrade hoodie dataset ", help = "Upgrades hoodie dataset") - public String downgradeHoodieDataset( - @CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, + @CliCommand(value = "downgrade table", help = "Downgrades a table") + public String downgradeHoodieTable( + @CliOption(key = {"toVersion"}, help = "To version of Hoodie table to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", @@ -74,8 +76,8 @@ public class UpgradeOrDowngradeCommand implements CommandMarker { int exitCode = process.waitFor(); HoodieCLI.refreshTableMetadata(); if (exitCode != 0) { - return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); + return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion); } - return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); + return String.format("Hoodie table upgraded/downgraded to ", toVersion); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index 1223cb75b..0059c7f8d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -166,10 +166,23 @@ public class DirectWriteMarkers extends WriteMarkers { return markerFiles; } + /** + * Creates a marker file based on the full marker name excluding the base path and instant. + * + * @param markerName the full marker name, e.g., "2021/08/13/file1.marker.CREATE" + * @return path of the marker file + */ + public Option create(String markerName) { + return create(new Path(markerDirPath, markerName), true); + } + @Override protected Option create(String partitionPath, String dataFileName, IOType type, boolean checkIfExists) { + return create(getMarkerPath(partitionPath, dataFileName, type), checkIfExists); + } + + private Option create(Path markerPath, boolean checkIfExists) { HoodieTimer timer = new HoodieTimer().startTimer(); - Path markerPath = getMarkerPath(partitionPath, dataFileName, type); Path dirPath = markerPath.getParent(); try { if (!fs.exists(dirPath)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java index 7719948f7..0a74689c5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/AbstractUpgradeDowngrade.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -33,6 +34,8 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; /** @@ -112,15 +115,27 @@ public abstract class AbstractUpgradeDowngrade { // Perform the actual upgrade/downgrade; this has to be idempotent, for now. LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion); + Map tableProps = new HashMap<>(); if (fromVersion.versionCode() < toVersion.versionCode()) { // upgrade - upgrade(fromVersion, toVersion, instantTime); + while (fromVersion.versionCode() < toVersion.versionCode()) { + HoodieTableVersion nextVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1); + tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime)); + fromVersion = nextVersion; + } } else { // downgrade - downgrade(fromVersion, toVersion, instantTime); + while (fromVersion.versionCode() > toVersion.versionCode()) { + HoodieTableVersion prevVersion = HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1); + tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime)); + fromVersion = prevVersion; + } } // Write out the current version in hoodie.properties.updated file + for (Map.Entry entry: tableProps.entrySet()) { + metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue()); + } metaClient.getTableConfig().setTableVersion(toVersion); createUpdatedFile(metaClient.getTableConfig().getProps()); @@ -143,7 +158,7 @@ public abstract class AbstractUpgradeDowngrade { } } - protected abstract void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); + protected abstract Map upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); - protected abstract void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); + protected abstract Map downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java new file mode 100644 index 000000000..a83261564 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToTwoUpgradeHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.util.HashMap; +import java.util.Map; + +public abstract class BaseOneToTwoUpgradeHandler implements UpgradeHandler { + + @Override + public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + Map tablePropsToAdd = new HashMap<>(); + tablePropsToAdd.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP, getPartitionColumns(config)); + tablePropsToAdd.put(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD.key())); + tablePropsToAdd.put(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP, config.getString(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)); + return tablePropsToAdd; + } + + abstract String getPartitionColumns(HoodieWriteConfig config); +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java new file mode 100644 index 000000000..5997e1812 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseOneToZeroDowngradeHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.WriteMarkers; +import org.apache.hudi.table.marker.WriteMarkersFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class BaseOneToZeroDowngradeHandler implements DowngradeHandler { + + @Override + public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + // fetch pending commit info + HoodieTable table = getTable(config, context); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList()); + for (HoodieInstant inflightInstant : commits) { + // delete existing markers + WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, inflightInstant.getTimestamp()); + writeMarkers.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + } + return Collections.EMPTY_MAP; + } + + abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context); +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java new file mode 100644 index 000000000..7b67de852 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.MarkerUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.DirectWriteMarkers; + +import com.esotericsoftware.minlog.Log; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX; + +public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler { + + @Override + public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + HoodieTable table = getTable(config, context); + HoodieTableMetaClient metaClient = table.getMetaClient(); + + // re-create marker files if any partial timeline server based markers are found + HoodieTimeline inflightTimeline = metaClient.getCommitsTimeline().filterPendingExcludingCompaction(); + List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList()); + for (HoodieInstant inflightInstant : commits) { + // Converts the markers in new format to old format of direct markers + try { + convertToDirectMarkers( + inflightInstant.getTimestamp(), table, context, config.getMarkersDeleteParallelism()); + } catch (IOException e) { + throw new HoodieException("Converting marker files to DIRECT style failed during downgrade", e); + } + } + return Collections.EMPTY_MAP; + } + + abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context); + + /** + * Converts the markers in new format(timeline server based) to old format of direct markers, + * i.e., one marker file per data file, without MARKERS.type file. + * This needs to be idempotent. + * 1. read all markers from timeline server based marker files + * 2. create direct style markers + * 3. delete marker type file + * 4. delete timeline server based marker files + * + * @param commitInstantTime instant of interest for marker conversion. + * @param table instance of {@link HoodieTable} to use + * @param context instance of {@link HoodieEngineContext} to use + * @param parallelism parallelism to use + */ + private void convertToDirectMarkers(final String commitInstantTime, + HoodieTable table, + HoodieEngineContext context, + int parallelism) throws IOException { + String markerDir = table.getMetaClient().getMarkerFolderPath(commitInstantTime); + FileSystem fileSystem = FSUtils.getFs(markerDir, context.getHadoopConf().newCopy()); + Option markerTypeOption = MarkerUtils.readMarkerType(fileSystem, markerDir); + if (markerTypeOption.isPresent()) { + switch (markerTypeOption.get()) { + case TIMELINE_SERVER_BASED: + // Reads all markers written by the timeline server + Map> markersMap = + MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerDir, fileSystem, context, parallelism); + DirectWriteMarkers directWriteMarkers = new DirectWriteMarkers(table, commitInstantTime); + // Recreates the markers in the direct format + markersMap.values().stream().flatMap(Collection::stream) + .forEach(directWriteMarkers::create); + // Deletes marker type file + MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir); + // Deletes timeline server based markers + deleteTimelineBasedMarkerFiles(markerDir, fileSystem); + break; + default: + throw new HoodieException("The marker type \"" + markerTypeOption.get().name() + + "\" is not supported for rollback."); + } + } else { + // In case of partial failures during downgrade, there is a chance that marker type file was deleted, + // but timeline server based marker files are left. So deletes them if any + deleteTimelineBasedMarkerFiles(markerDir, fileSystem); + } + } + + private void deleteTimelineBasedMarkerFiles(String markerDir, FileSystem fileSystem) throws IOException { + // Deletes timeline based marker files if any. + Path dirPath = new Path(markerDir); + FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); + Predicate prefixFilter = fileStatus -> + fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX); + List markerDirSubPaths = Arrays.stream(fileStatuses) + .filter(prefixFilter) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + markerDirSubPaths.forEach(fileToDelete -> { + try { + fileSystem.delete(new Path(fileToDelete), false); + } catch (IOException e) { + Log.warn("Deleting Timeline based marker files failed ", e); + } + }); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java new file mode 100644 index 000000000..f0e3e4f1e --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseZeroToOneUpgradeHandler.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; +import org.apache.hudi.table.action.rollback.RollbackUtils; +import org.apache.hudi.table.marker.WriteMarkers; +import org.apache.hudi.table.marker.WriteMarkersFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class BaseZeroToOneUpgradeHandler implements UpgradeHandler { + + @Override + public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { + // fetch pending commit info + //HoodieSparkTable table = HoodieSparkTable.create(config, context); + HoodieTable table = getTable(config, context); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + if (commits.size() > 0 && instantTime != null) { + // ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch + commits.remove(instantTime); + } + for (String commit : commits) { + // for every pending commit, delete old markers and re-create markers in new format + recreateMarkers(commit, table, context, config.getMarkersDeleteParallelism()); + } + return Collections.EMPTY_MAP; + } + + abstract HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context); + + /** + * Recreate markers in new format. + * Step1: Delete existing markers + * Step2: Collect all rollback file info. + * Step3: recreate markers for all interested files. + * + * @param commitInstantTime instant of interest for which markers need to be recreated. + * @param table instance of {@link HoodieTable} to use + * @param context instance of {@link HoodieEngineContext} to use + * @throws HoodieRollbackException on any exception during upgrade. + */ + protected void recreateMarkers(final String commitInstantTime, + HoodieTable table, + HoodieEngineContext context, + int parallelism) throws HoodieRollbackException { + try { + // fetch hoodie instant + Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) + .findFirst()); + if (commitInstantOpt.isPresent()) { + // delete existing markers + WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT, table, commitInstantTime); + writeMarkers.quietDeleteMarkerDir(context, parallelism); + + // generate rollback stats + List rollbackRequests; + if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig()); + } else { + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); + } + List rollbackStats = getListBasedRollBackStats(table.getMetaClient(), table.getConfig(), + context, commitInstantOpt, rollbackRequests); + + // recreate markers adhering to marker based rollback + for (HoodieRollbackStat rollbackStat : rollbackStats) { + for (String path : rollbackStat.getSuccessDeleteFiles()) { + String dataFileName = path.substring(path.lastIndexOf("/") + 1); + // not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files. + writeMarkers.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE); + } + for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) { + writeMarkers.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND); + } + } + } + } catch (Exception e) { + throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e); + } + } + + abstract List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + HoodieEngineContext context, Option commitInstantOpt, + List rollbackRequests); + + /** + * Curates file name for marker from existing log file path. + * log file format : partitionpath/.fileid_baseInstant.log.writetoken + * marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND + * + * @param logFilePath log file path for which marker file name needs to be generated. + * @return the marker file name thus curated. + */ + private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { + Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath); + String fileId = FSUtils.getFileIdFromLogPath(logPath); + String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); + String writeToken = FSUtils.getWriteTokenFromLogPath(logPath); + + return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension()); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java index 2b525841e..7501ed5fa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java @@ -18,9 +18,12 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.config.HoodieWriteConfig; +import java.util.Map; + /** * Interface to assist in downgrading Hoodie table. */ @@ -32,6 +35,7 @@ public interface DowngradeHandler { * @param config instance of {@link HoodieWriteConfig} to be used. * @param context instance of {@link HoodieEngineContext} to be used. * @param instantTime current instant time that should not touched. + * @return Map of config properties and its values to be added to table properties. */ - void downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); + Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java index 2ba688f22..8ca6f0e86 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java @@ -18,9 +18,12 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.config.HoodieWriteConfig; +import java.util.Map; + /** * Interface to assist in upgrading Hoodie table. */ @@ -32,6 +35,7 @@ public interface UpgradeHandler { * @param config instance of {@link HoodieWriteConfig} to be used. * @param context instance of {@link HoodieEngineContext} to be used. * @param instantTime current instant time that should not be touched. + * @return Map of config properties and its values to be added to table properties. */ - void upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); + Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java index af57a9e5c..67376aef5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -25,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpgradeDowngradeException; import java.io.IOException; +import java.util.Map; public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade { public FlinkUpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context) { @@ -42,18 +44,22 @@ public class FlinkUpgradeDowngrade extends AbstractUpgradeDowngrade { } @Override - protected void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + protected Map upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { - new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); + return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); + } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) { + return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); } } @Override - protected void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + protected Map downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { - new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); + return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); + } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) { + return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java new file mode 100644 index 000000000..3aa9f9b27 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler { + + @Override + String getPartitionColumns(HoodieWriteConfig config) { + return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD.key()); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java index e0de6447a..5d6e57e42 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java @@ -20,31 +20,17 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; - -import java.util.List; -import java.util.stream.Collectors; +import org.apache.hudi.table.HoodieTable; /** * Downgrade handle to assist in downgrading hoodie table from version 1 to 0. */ -public class OneToZeroDowngradeHandler implements DowngradeHandler { +public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler { @Override - public void downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { - // fetch pending commit info - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList()); - for (HoodieInstant commitInstant : commits) { - // delete existing markers - WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, commitInstant.getTimestamp()); - writeMarkers.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - } + HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java new file mode 100644 index 000000000..ec8098aa6 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; + +public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler { + @Override + HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 3996765ca..59e94e557 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -21,120 +21,31 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.marker.MarkerType; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; -import org.apache.hudi.table.action.rollback.RollbackUtils; -import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import java.util.List; -import java.util.stream.Collectors; /** * Upgrade handle to assist in upgrading hoodie table from version 0 to 1. */ -public class ZeroToOneUpgradeHandler implements UpgradeHandler { +public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler { @Override - public void upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { - // fetch pending commit info - HoodieFlinkTable table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - if (commits.size() > 0 && instantTime != null) { - // ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch - commits.remove(instantTime); - } - for (String commit : commits) { - // for every pending commit, delete old markers and re-create markers in new format - recreateMarkers(commit, table, context, config.getMarkersType(), config.getMarkersDeleteParallelism()); - } + HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } - /** - * Recreate markers in new format. - * Step1: Delete existing markers - * Step2: Collect all rollback file info. - * Step3: recreate markers for all interested files. - * - * @param commitInstantTime instant of interest for which markers need to be recreated. - * @param table instance of {@link HoodieFlinkTable} to use - * @param context instance of {@link HoodieEngineContext} to use - * @param markerType marker type to use - * @throws HoodieRollbackException on any exception during upgrade. - */ - private static void recreateMarkers(final String commitInstantTime, - HoodieFlinkTable table, - HoodieEngineContext context, - MarkerType markerType, - int parallelism) throws HoodieRollbackException { - try { - // fetch hoodie instant - Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() - .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) - .findFirst()); - if (commitInstantOpt.isPresent()) { - // delete existing markers - WriteMarkers writeMarkers = WriteMarkersFactory.get(markerType, table, commitInstantTime); - writeMarkers.quietDeleteMarkerDir(context, parallelism); - - // generate rollback stats - List rollbackRequests; - if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { - rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig()); - } else { - rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); - } - List rollbackStats = new ListingBasedRollbackHelper(table.getMetaClient(), table.getConfig()) - .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests); - - // recreate markers adhering to marker based rollback - for (HoodieRollbackStat rollbackStat : rollbackStats) { - for (String path : rollbackStat.getSuccessDeleteFiles()) { - String dataFileName = path.substring(path.lastIndexOf("/") + 1); - // not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files. - writeMarkers.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE); - } - for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) { - writeMarkers.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND); - } - } - } - } catch (Exception e) { - throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e); - } - } - - /** - * Curates file name for marker from existing log file path. - * log file format : partitionpath/.fileid_baseInstant.log.writetoken - * marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND - * - * @param logFilePath log file path for which marker file name needs to be generated. - * @return the marker file name thus curated. - */ - private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { - Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath); - String fileId = FSUtils.getFileIdFromLogPath(logPath); - String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); - String writeToken = FSUtils.getWriteTokenFromLogPath(logPath); - - return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension()); + @Override + List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt, + List rollbackRequests) { + return new ListingBasedRollbackHelper(metaClient, config) + .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java new file mode 100644 index 000000000..1f4c20382 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToTwoUpgradeHandler.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.config.HoodieWriteConfig; + +/** + * Upgrade handle to assist in upgrading hoodie table from version 1 to 2. + */ +public class OneToTwoUpgradeHandler extends BaseOneToTwoUpgradeHandler { + + @Override + String getPartitionColumns(HoodieWriteConfig config) { + return HoodieSparkUtils.getPartitionColumns(config.getProps()); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java index ce6c8a1b5..2e6064a40 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java @@ -19,31 +19,17 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; - -import java.util.List; -import java.util.stream.Collectors; +import org.apache.hudi.table.HoodieTable; /** * Downgrade handle to assist in downgrading hoodie table from version 1 to 0. */ -public class OneToZeroDowngradeHandler implements DowngradeHandler { +public class OneToZeroDowngradeHandler extends BaseOneToZeroDowngradeHandler { @Override - public void downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { - // fetch pending commit info - HoodieSparkTable table = HoodieSparkTable.create(config, context); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList()); - for (HoodieInstant commitInstant : commits) { - // delete existing markers - WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), table, commitInstant.getTimestamp()); - writeMarkers.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - } + HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieSparkTable.create(config, context); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java index accdee221..7284db5df 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngrade.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -25,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpgradeDowngradeException; import java.io.IOException; +import java.util.Map; public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade { @@ -43,22 +45,25 @@ public class SparkUpgradeDowngrade extends AbstractUpgradeDowngrade { } catch (IOException e) { throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e); } - } @Override - protected void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + protected Map upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { - new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); + return new ZeroToOneUpgradeHandler().upgrade(config, context, instantTime); + } else if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.TWO) { + return new OneToTwoUpgradeHandler().upgrade(config, context, instantTime); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); } } @Override - protected void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + protected Map downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { - new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); + return new OneToZeroDowngradeHandler().downgrade(config, context, instantTime); + } else if (fromVersion == HoodieTableVersion.TWO && toVersion == HoodieTableVersion.ONE) { + return new TwoToOneDowngradeHandler().downgrade(config, context, instantTime); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java new file mode 100644 index 000000000..055d33047 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/TwoToOneDowngradeHandler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; + +/** + * Downgrade handle to assist in downgrading hoodie table from version 2 to 1. + */ +public class TwoToOneDowngradeHandler extends BaseTwoToOneDowngradeHandler { + + @Override + HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieSparkTable.create(config, context); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 8470b9685..7bf7209c0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -20,120 +20,31 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.marker.MarkerType; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; -import org.apache.hudi.table.action.rollback.RollbackUtils; -import org.apache.hudi.table.marker.WriteMarkers; -import org.apache.hudi.table.marker.WriteMarkersFactory; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import java.util.List; -import java.util.stream.Collectors; /** * Upgrade handle to assist in upgrading hoodie table from version 0 to 1. */ -public class ZeroToOneUpgradeHandler implements UpgradeHandler { +public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler { @Override - public void upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime) { - // fetch pending commit info - HoodieSparkTable table = HoodieSparkTable.create(config, context); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - if (commits.size() > 0 && instantTime != null) { - // ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch - commits.remove(instantTime); - } - for (String commit : commits) { - // for every pending commit, delete old markers and re-create markers in new format - recreateMarkers(commit, table, context, config.getMarkersType(), config.getMarkersDeleteParallelism()); - } + HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieSparkTable.create(config, context); } - /** - * Recreate markers in new format. - * Step1: Delete existing markers - * Step2: Collect all rollback file info. - * Step3: recreate markers for all interested files. - * - * @param commitInstantTime instant of interest for which markers need to be recreated. - * @param table instance of {@link HoodieSparkTable} to use - * @param context instance of {@link HoodieEngineContext} to use - * @param markerType marker type to use - * @throws HoodieRollbackException on any exception during upgrade. - */ - private static void recreateMarkers(final String commitInstantTime, - HoodieSparkTable table, - HoodieEngineContext context, - MarkerType markerType, - int parallelism) throws HoodieRollbackException { - try { - // fetch hoodie instant - Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() - .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) - .findFirst()); - if (commitInstantOpt.isPresent()) { - // delete existing markers - WriteMarkers writeMarkers = WriteMarkersFactory.get(markerType, table, commitInstantTime); - writeMarkers.quietDeleteMarkerDir(context, parallelism); - - // generate rollback stats - List rollbackRequests; - if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { - rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig()); - } else { - rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); - } - List rollbackStats = new ListingBasedRollbackHelper(table.getMetaClient(), table.getConfig()) - .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests); - - // recreate markers adhering to marker based rollback - for (HoodieRollbackStat rollbackStat : rollbackStats) { - for (String path : rollbackStat.getSuccessDeleteFiles()) { - String dataFileName = path.substring(path.lastIndexOf("/") + 1); - // not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files. - writeMarkers.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE); - } - for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) { - writeMarkers.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND); - } - } - } - } catch (Exception e) { - throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e); - } - } - - /** - * Curates file name for marker from existing log file path. - * log file format : partitionpath/.fileid_baseInstant.log.writetoken - * marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND - * - * @param logFilePath log file path for which marker file name needs to be generated. - * @return the marker file name thus curated. - */ - private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { - Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath); - String fileId = FSUtils.getFileIdFromLogPath(logPath); - String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); - String writeToken = FSUtils.getWriteTokenFromLogPath(logPath); - - return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension()); + @Override + List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt, + List rollbackRequests) { + return new ListingBasedRollbackHelper(metaClient, config) + .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests); } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 43daba775..239e101d0 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -18,11 +18,17 @@ package org.apache.hudi +import java.util.Properties + import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.client.utils.SparkRowSerDe +import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} import org.apache.spark.SPARK_VERSION import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters @@ -221,11 +227,41 @@ object HoodieSparkUtils extends SparkAdapterSupport { val leftExp = toAttribute(attribute, tableSchema) val rightExp = Literal.create(s"%$value%") sparkAdapter.createLike(leftExp, rightExp) - case _=> null + case _ => null } ) } + /** + * @param properties config properties + * @return partition columns + */ + def getPartitionColumns(properties: Properties): String = { + val props = new TypedProperties(properties) + val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + getPartitionColumns(keyGenerator, props) + } + + /** + * @param keyGen key generator + * @return partition columns + */ + def getPartitionColumns(keyGen: KeyGenerator, typedProperties: TypedProperties): String = { + keyGen match { + // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format + // is: "field_name: field_type", we extract the field_name from the partition path field. + case c: BaseKeyGenerator + if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] => + c.getPartitionPathFields.asScala.map(pathField => + pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX) + .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) + .mkString(",") + + case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",") + case _ => typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD.key()) + } + } + private def toAttribute(columnName: String, tableSchema: StructType): AttributeReference = { val field = tableSchema.find(p => p.name == columnName) assert(field.isDefined, s"Cannot find column: $columnName, Table Columns are: " + diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 6feaf23df..68876d79b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -18,10 +18,6 @@ package org.apache.hudi.table.upgrade; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieConfig; @@ -31,19 +27,31 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.testutils.Assertions; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -52,20 +60,26 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP; import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_TYPE_PROP; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; +import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,11 +90,29 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestUpgradeDowngrade extends HoodieClientTestBase { private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1}"; + private static final String TEST_NAME_WITH_DOWNGRADE_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1} and " + + "From version = {2}"; public static Stream configParams() { Object[][] data = new Object[][] { - {true, HoodieTableType.COPY_ON_WRITE}, {false, HoodieTableType.COPY_ON_WRITE}, - {true, HoodieTableType.MERGE_ON_READ}, {false, HoodieTableType.MERGE_ON_READ} + {true, HoodieTableType.COPY_ON_WRITE}, + {false, HoodieTableType.COPY_ON_WRITE}, + {true, HoodieTableType.MERGE_ON_READ}, + {false, HoodieTableType.MERGE_ON_READ} + }; + return Stream.of(data).map(Arguments::of); + } + + public static Stream downGradeConfigParams() { + Object[][] data = new Object[][] { + {true, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.TWO}, + {false, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.TWO}, + {true, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.TWO}, + {false, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.TWO}, + {true, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.ONE}, + {false, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.ONE}, + {true, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.ONE}, + {false, HoodieTableType.MERGE_ON_READ, HoodieTableVersion.ONE} }; return Stream.of(data).map(Arguments::of); } @@ -88,9 +120,9 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { @BeforeEach public void setUp() throws Exception { initSparkContexts(); - initDFS(); + initPath(); initTestDataGenerator(); - initDFSMetaClient(); + initMetaClient(); } @AfterEach @@ -100,21 +132,21 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { @Test public void testLeftOverUpdatedPropFileCleanup() throws IOException { - testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); + testUpgradeZeroToOneInternal(true, true, HoodieTableType.MERGE_ON_READ); } @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) @MethodSource("configParams") - public void testUpgrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { - testUpgradeInternal(false, deletePartialMarkerFiles, tableType); + public void testUpgradeZeroToOne(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + testUpgradeZeroToOneInternal(false, deletePartialMarkerFiles, tableType); } - public void testUpgradeInternal(boolean induceResiduesFromPrevUpgrade, boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + public void testUpgradeZeroToOneInternal(boolean induceResiduesFromPrevUpgrade, boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { // init config, table and client. Map params = new HashMap<>(); if (tableType == HoodieTableType.MERGE_ON_READ) { params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name()); - metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, HoodieTableType.MERGE_ON_READ); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); } HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); @@ -151,30 +183,116 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices); // verify hoodie.table.version got upgraded + metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build(); assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ONE.versionCode()); assertTableVersionFromPropertyFile(HoodieTableVersion.ONE); // trigger 3rd commit with marker based rollback enabled. + /* HUDI-2310 List thirdBatch = triggerCommit("003", tableType, true); // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. assertRows(inputRecords.getKey(), thirdBatch); if (induceResiduesFromPrevUpgrade) { assertFalse(dfs.exists(new Path(metaClient.getMetaPath(), SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE))); - } + }*/ } - @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) - @MethodSource("configParams") - public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testUpgradeOneToTwo(HoodieTableType tableType) throws IOException { // init config, table and client. Map params = new HashMap<>(); + addNewTableParamsToProps(params); if (tableType == HoodieTableType.MERGE_ON_READ) { params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name()); metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); } HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); + // Write inserts + doInsert(client); + + // downgrade table props + downgradeTableConfigsFromTwoToOne(cfg); + + // perform upgrade + new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.TWO, cfg, context, null); + + // verify hoodie.table.version got upgraded + metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build(); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.TWO.versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.TWO); + + // verify table props + assertTableProps(cfg); + } + + private void addNewTableParamsToProps(Map params) { + params.put(KeyGeneratorOptions.RECORDKEY_FIELD.key(), "uuid"); + params.put(KeyGeneratorOptions.PARTITIONPATH_FIELD.key(), "partition_path"); + params.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP.key(), metaClient.getTableConfig().getTableName()); + params.put(HOODIE_BASE_FILE_FORMAT_PROP.key(), HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().name()); + } + + private void doInsert(SparkRDDWriteClient client) { + // Write 1 (only inserts) + String commit1 = "000"; + client.startCommitWithTime(commit1); + List records = dataGen.generateInserts(commit1, 100); + JavaRDD writeRecords = jsc.parallelize(records, 1); + client.insert(writeRecords, commit1).collect(); + } + + private void downgradeTableConfigsFromTwoToOne(HoodieWriteConfig cfg) throws IOException { + Properties properties = new Properties(cfg.getProps()); + properties.remove(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key()); + properties.remove(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key()); + properties.remove(HoodieTableConfig.HOODIE_TABLE_NAME_PROP.key()); + properties.remove(HOODIE_BASE_FILE_FORMAT_PROP.key()); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP.key(), "1"); + + metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType(), properties); + // set hoodie.table.version to 1 in hoodie.properties file + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); + } + + private void assertTableProps(HoodieWriteConfig cfg) { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + Properties originalProps = cfg.getProps(); + assertEquals(tableConfig.getPartitionFieldProp(), originalProps.getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD.key())); + assertEquals(tableConfig.getRecordKeyFieldProp(), originalProps.getProperty(KeyGeneratorOptions.RECORDKEY_FIELD.key())); + assertEquals(tableConfig.getTableName(), cfg.getTableName()); + assertEquals(tableConfig.getBaseFileFormat().name(), originalProps.getProperty(HOODIE_BASE_FILE_FORMAT_PROP.key())); + } + + @ParameterizedTest(name = TEST_NAME_WITH_DOWNGRADE_PARAMS) + @MethodSource("downGradeConfigParams") + public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tableType, HoodieTableVersion fromVersion) throws IOException { + MarkerType markerType = fromVersion == HoodieTableVersion.TWO ? MarkerType.TIMELINE_SERVER_BASED : MarkerType.DIRECT; + // init config, table and client. + Map params = new HashMap<>(); + if (fromVersion == HoodieTableVersion.TWO) { + addNewTableParamsToProps(params); + } + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(HOODIE_TABLE_TYPE_PROP.key(), HoodieTableType.MERGE_ON_READ.name()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + } + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(true) + .withMarkersType(markerType.name()).withProps(params).build(); + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + + if (fromVersion == HoodieTableVersion.TWO) { + // set table configs + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_NAME_PROP, cfg.getTableName()); + tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP, cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD.key())); + tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS, cfg.getString(KeyGeneratorOptions.RECORDKEY_FIELD.key())); + tableConfig.setValue(HOODIE_BASE_FILE_FORMAT_PROP, cfg.getString(HOODIE_BASE_FILE_FORMAT_PROP)); + } // prepare data. Make 2 commits, in which 2nd is not committed. List firstPartitionCommit2FileSlices = new ArrayList<>(); @@ -185,7 +303,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get(); // delete one of the marker files in 2nd commit if need be. - WriteMarkers writeMarkers = WriteMarkersFactory.get(getConfig().getMarkersType(), table, commitInstant.getTimestamp()); + WriteMarkers writeMarkers = WriteMarkersFactory.get(markerType, table, commitInstant.getTimestamp()); List markerPaths = new ArrayList<>(writeMarkers.allMarkerFilePaths()); if (deletePartialMarkerFiles) { String toDeleteMarkerFile = markerPaths.get(0); @@ -193,30 +311,55 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { markerPaths.remove(toDeleteMarkerFile); } - // set hoodie.table.version to 1 in hoodie.properties file - prepForDowngrade(); + // set hoodie.table.version to fromVersion in hoodie.properties file + HoodieTableVersion toVersion = HoodieTableVersion.ZERO; + if (fromVersion == HoodieTableVersion.TWO) { + prepForDowngradeFromTwoToOne(); + toVersion = HoodieTableVersion.ONE; + } else { + prepForDowngradeFromOneToZero(); + } // downgrade should be performed. all marker files should be deleted - new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, HoodieTableVersion.ZERO, cfg, context, null); + new SparkUpgradeDowngrade(metaClient, cfg, context).run(metaClient, toVersion, cfg, context, null); // assert marker files - assertMarkerFilesForDowngrade(table, commitInstant); + assertMarkerFilesForDowngrade(table, commitInstant, toVersion == HoodieTableVersion.ONE); // verify hoodie.table.version got downgraded - assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode()); - assertTableVersionFromPropertyFile(HoodieTableVersion.ZERO); + metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build(); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), toVersion.versionCode()); + assertTableVersionFromPropertyFile(toVersion); // trigger 3rd commit with marker based rollback disabled. + /* HUDI-2310 List thirdBatch = triggerCommit("003", tableType, false); // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. assertRows(inputRecords.getKey(), thirdBatch); + */ } - private void assertMarkerFilesForDowngrade(HoodieTable table, HoodieInstant commitInstant) throws IOException { + private void assertMarkerFilesForDowngrade(HoodieTable table, HoodieInstant commitInstant, boolean assertExists) throws IOException { // Verify recreated marker files are as expected WriteMarkers writeMarkers = WriteMarkersFactory.get(getConfig().getMarkersType(), table, commitInstant.getTimestamp()); - assertFalse(writeMarkers.doesMarkerDirExist()); + if (assertExists) { + assertTrue(writeMarkers.doesMarkerDirExist()); + assertEquals(0, getTimelineServerBasedMarkerFileCount(table.getMetaClient().getMarkerFolderPath(commitInstant.getTimestamp()), + table.getMetaClient().getFs())); + } else { + assertFalse(writeMarkers.doesMarkerDirExist()); + } + } + + private long getTimelineServerBasedMarkerFileCount(String markerDir, FileSystem fileSystem) throws IOException { + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(markerDir)); + Predicate prefixFilter = pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX); + return Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().toString()) + .filter(prefixFilter) + .collect(Collectors.toList()).stream().count(); } private void assertMarkerFilesForUpgrade(HoodieTable table, HoodieInstant commitInstant, List firstPartitionCommit2FileSlices, @@ -349,7 +492,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { //just generate two partitions dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); //1. prepare data - HoodieTestDataGenerator.writePartitionMetadata(dfs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, dfsBasePath); + HoodieTestDataGenerator.writePartitionMetadata(metaClient.getFs(), new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); /** * Write 1 (only inserts) */ @@ -396,7 +539,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { return Pair.of(records, records2); } - private void prepForDowngrade() throws IOException { + private void prepForDowngradeFromOneToZero() throws IOException { metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) { @@ -404,13 +547,21 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { } } + private void prepForDowngradeFromTwoToOne() throws IOException { + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.TWO); + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) { + metaClient.getTableConfig().getProps().store(os, ""); + } + } + private void createResidualFile() throws IOException { Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + SparkUpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); // Step1: Copy hoodie.properties to hoodie.properties.orig FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile, - false, metaClient.getHadoopConf()); + false, hadoopConf); } private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java index eb2e200de..6bbc02d82 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java @@ -30,7 +30,9 @@ public enum HoodieTableVersion { // < 0.6.0 versions ZERO(0), // 0.6.0 onwards - ONE(1); + ONE(1), + // 0.9.0 onwards + TWO(2); private final int versionCode; @@ -43,10 +45,10 @@ public enum HoodieTableVersion { } public static HoodieTableVersion current() { - return ONE; + return TWO; } - static HoodieTableVersion versionFromCode(int versionCode) { + public static HoodieTableVersion versionFromCode(int versionCode) { return Arrays.stream(HoodieTableVersion.values()) .filter(v -> v.versionCode == versionCode).findAny() .orElseThrow(() -> new HoodieException("Unknown versionCode:" + versionCode)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java index 9d0965922..f799cb32b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -179,12 +179,13 @@ public class MarkerUtils { try { if (fileSystem.exists(dirPath)) { FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); - Predicate prefixFilter = pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX); - Predicate markerTypeFilter = - pathStr -> !stripMarkerFolderPrefix(pathStr, markerDir).equals(MARKER_TYPE_FILENAME); + Predicate prefixFilter = fileStatus -> + fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX); + Predicate markerTypeFilter = fileStatus -> + !fileStatus.getPath().getName().equals(MARKER_TYPE_FILENAME); List markerDirSubPaths = Arrays.stream(fileStatuses) - .map(fileStatus -> fileStatus.getPath().toString()) .filter(prefixFilter.and(markerTypeFilter)) + .map(fileStatus -> fileStatus.getPath().toString()) .collect(Collectors.toList()); if (markerDirSubPaths.size() > 0) { @@ -216,4 +217,4 @@ public class MarkerUtils { throw new HoodieIOException(ioe.getMessage(), ioe); } } -} +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 2c8d33e56..87e7b2202 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -18,6 +18,9 @@ package org.apache.hudi +import java.util +import java.util.Properties + import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration @@ -29,9 +32,8 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.TableSchemaResolver import org.apache.hudi.common.table.timeline.HoodieActiveTimeline -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} @@ -51,8 +53,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkContext} -import java.util -import java.util.Properties import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -118,7 +118,7 @@ object HoodieSparkSqlWriter { } else { // Handle various save modes handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) - val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) + val partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, toProperties(parameters)) // Create the table if not present if (!tableExists) { val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index ef26ee55c..da06cbc72 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -17,13 +17,13 @@ package org.apache.hudi +import java.util.Properties + import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig.{METADATA_ENABLE_PROP, METADATA_VALIDATE_PROP} import org.apache.hudi.common.config.{HoodieConfig, TypedProperties} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory -import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} -import java.util.Properties import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters.{mapAsScalaMapConverter, _} @@ -96,23 +96,7 @@ object HoodieWriterUtils { val props = new TypedProperties() props.putAll(parameters.asJava) val keyGen = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) - getPartitionColumns(keyGen) - } - - def getPartitionColumns(keyGen: KeyGenerator): String = { - keyGen match { - // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format - // is: "field_name: field_type", we extract the field_name from the partition path field. - case c: BaseKeyGenerator - if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] => - c.getPartitionPathFields.asScala.map(pathField => - pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX) - .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) - .mkString(",") - - case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",") - case _=> null - } + HoodieSparkUtils.getPartitionColumns(keyGen, props) } def convertMapToHoodieConfig(parameters: Map[String, String]): HoodieConfig = { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 9075aae8e..68f98bfe2 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -273,7 +273,6 @@ public class MarkerDirState implements Serializable { private void syncMarkersFromFileSystem() { Map> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( markerDirPath, fileSystem, hoodieEngineContext, parallelism); - for (String markersFilePathStr : fileMarkersSetMap.keySet()) { Set fileMarkers = fileMarkersSetMap.get(markersFilePathStr); if (!fileMarkers.isEmpty()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index f4597fd57..2f9d148db 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieSparkUtils; -import org.apache.hudi.HoodieWriterUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -50,7 +49,6 @@ import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; @@ -62,6 +60,7 @@ import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; +import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaSet; @@ -249,7 +248,7 @@ public class DeltaSync implements Serializable { } } else { this.commitTimelineOpt = Option.empty(); - String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); + String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props); HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) .setTableName(cfg.targetTableName) @@ -353,7 +352,7 @@ public class DeltaSync implements Serializable { } } } else { - String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); + String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props); HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) .setTableName(cfg.targetTableName)