From ff53e8f0b62448815a487285ce0cc337398272f2 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sun, 9 Aug 2020 18:32:43 -0400 Subject: [PATCH] [HUDI-1014] Adding Upgrade and downgrade infra for smooth transitioning from list based rollback to marker based rollback (#1858) - This pull request adds upgrade/downgrade infra for smooth transition from list based rollback to marker based rollback* - A new property called hoodie.table.version is added to hoodie.properties file as part of this. Whenever hoodie is launched with newer table version i.e 1(or moving from pre 0.6.0 to 0.6.0), an upgrade step will be executed automatically to adhere to marker based rollback.* - This automatic upgrade step will happen just once per dataset as the hoodie.table.version will be updated in property file after upgrade is completed once* - Similarly, a command line tool for Downgrading is added if incase some user wants to downgrade hoodie from table version 1 to 0 or move from hoodie 0.6.0 to pre 0.6.0* - *Added UpgradeDowngrade to assist in upgrading or downgrading hoodie table* - *Added Interfaces for upgrade and downgrade and concrete implementations for upgrading from 0 to 1 and downgrading from 1 to 0.* - *Made some changes to ListingBasedRollbackHelper to expose just rollback stats w/o performing actual rollback, which will be consumed by Upgrade infra* - Reworking failure handling for upgrade/downgrade - Changed tests accordingly, added one test around left over cleanup - New tables now write table version into hoodie.properties - Clean up code naming, abstractions. Co-authored-by: Vinoth Chandar --- .../apache/hudi/cli/commands/SparkMain.java | 41 +- .../commands/UpgradeOrDowngradeCommand.java | 81 ++++ .../hudi/cli/commands/TestRepairsCommand.java | 4 +- .../commands/TestUpgradeDowngradeCommand.java | 128 ++++++ .../client/AbstractHoodieWriteClient.java | 11 +- .../apache/hudi/client/HoodieWriteClient.java | 20 +- .../rollback/BaseRollbackActionExecutor.java | 32 +- .../CopyOnWriteRollbackActionExecutor.java | 43 +- .../rollback/ListingBasedRollbackHelper.java | 110 +++-- .../MergeOnReadRollbackActionExecutor.java | 172 +------- .../table/action/rollback/RollbackUtils.java | 173 ++++++++ .../hudi/table/upgrade/DowngradeHandler.java | 38 ++ .../HoodieUpgradeDowngradeException.java | 32 ++ .../upgrade/OneToZeroDowngradeHandler.java | 49 +++ .../hudi/table/upgrade/UpgradeDowngrade.java | 153 +++++++ .../hudi/table/upgrade/UpgradeHandler.java | 38 ++ .../upgrade/ZeroToOneUpgradeHandler.java | 132 ++++++ .../table/upgrade/TestUpgradeDowngrade.java | 408 ++++++++++++++++++ .../hudi/testutils/HoodieClientTestUtils.java | 25 ++ .../hudi/common/table/HoodieTableConfig.java | 33 +- .../common/table/HoodieTableMetaClient.java | 8 +- .../hudi/common/table/HoodieTableVersion.java | 54 +++ 22 files changed, 1521 insertions(+), 264 deletions(-) create mode 100644 hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java create mode 100644 hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/upgrade/HoodieUpgradeDowngradeException.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java create mode 100644 hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 5f9509840..e6a3f529c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -23,7 +23,10 @@ import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.client.utils.ClientUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieBootstrapConfig; @@ -31,6 +34,7 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter.Config; @@ -64,7 +68,7 @@ public class SparkMain { */ enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, - COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT + COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE } public static void main(String[] args) throws Exception { @@ -185,6 +189,11 @@ public class SparkMain { returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs); break; + case UPGRADE: + case DOWNGRADE: + assert (args.length == 5); + returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]); + break; default: break; } @@ -380,9 +389,35 @@ public class SparkMain { } } + /** + * Upgrade or downgrade table. + * + * @param jsc instance of {@link JavaSparkContext} to use. + * @param basePath base path of the dataset. + * @param toVersion version to which upgrade/downgrade to be done. + * @return 0 if success, else -1. + * @throws Exception + */ + protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) { + HoodieWriteConfig config = getWriteConfig(basePath); + HoodieTableMetaClient metaClient = ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, false); + try { + UpgradeDowngrade.run(metaClient, HoodieTableVersion.valueOf(toVersion), config, jsc, null); + LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion)); + return 0; + } catch (Exception e) { + LOG.warn(String.format("Failed: Could not upgrade/downgrade table at \"%s\" to version \"%s\".", basePath, toVersion), e); + return -1; + } + } + private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); + HoodieWriteConfig config = getWriteConfig(basePath); return new HoodieWriteClient(jsc, config); } + + private static HoodieWriteConfig getWriteConfig(String basePath) { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); + } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java new file mode 100644 index 000000000..deb9e0727 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.SparkMain.SparkCommand; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import org.apache.spark.launcher.SparkLauncher; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; + +/** + * CLI command to assist in upgrading/downgrading Hoodie dataset to a different version. + */ +public class UpgradeOrDowngradeCommand implements CommandMarker { + + @CliCommand(value = "upgrade hoodie dataset ", help = "Upgrades hoodie dataset") + public String upgradeHoodieDataset( + @CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory) + throws Exception { + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.UPGRADE.toString(), master, sparkMemory, metaClient.getBasePath(), toVersion); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + HoodieCLI.refreshTableMetadata(); + if (exitCode != 0) { + return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); + } + return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); + } + + @CliCommand(value = "downgrade hoodie dataset ", help = "Upgrades hoodie dataset") + public String downgradeHoodieDataset( + @CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory) + throws Exception { + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.DOWNGRADE.toString(), master, sparkMemory, metaClient.getBasePath(), toVersion); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + HoodieCLI.refreshTableMetadata(); + if (exitCode != 0) { + return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); + } + return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index afad53b93..ed14a64fc 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -167,10 +167,10 @@ public class TestRepairsCommand extends AbstractShellIntegrationTest { assertEquals(expected, result); // check result - List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", + List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", "hoodie.table.version", "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key, - oldProps.getOrDefault(key, null), result.getOrDefault(key, null)}) + oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")}) .toArray(String[][]::new); String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java new file mode 100644 index 000000000..4c0479a58 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link UpgradeOrDowngradeCommand}. + */ +public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest { + + private String tablePath; + + @BeforeEach + public void init() throws IOException { + String tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + //Create some commits files and parquet files + String commitTime1 = "100"; + String commitTime2 = "101"; + HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath); + + // one commit file + HoodieTestUtils.createCommitFiles(tablePath, commitTime1); + // one .inflight commit file + HoodieTestUtils.createInflightCommitFiles(tablePath, commitTime2); + + // generate commit files for commit 100 + for (String commitTime : Arrays.asList(commitTime1)) { + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + } + + // generate commit and marker files for inflight commit 101 + for (String commitTime : Arrays.asList(commitTime2)) { + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + } + } + + @Test + public void testDowngradeCommand() throws Exception { + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + // update hoodie.table.version to 1 + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); + try (FSDataOutputStream os = metaClient.getFs().create(new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE), true)) { + metaClient.getTableConfig().getProperties().store(os, ""); + } + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + // verify marker files for inflight commit exists + for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) { + assertEquals(1, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101")); + } + + SparkMain.upgradeOrDowngradeTable(jsc, tablePath, HoodieTableVersion.ZERO.name()); + + // verify hoodie.table.version got downgraded + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + // verify hoodie.table.version + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode()); + assertTableVersionFromPropertyFile(); + + // verify marker files are non existant + for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) { + assertEquals(0, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101")); + } + } + + private void assertTableVersionFromPropertyFile() throws IOException { + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + // Load the properties and verify + FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile); + Properties prop = new Properties(); + prop.load(fsDataInputStream); + fsDataInputStream.close(); + assertEquals(Integer.toString(HoodieTableVersion.ZERO.versionCode()), prop.getProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME)); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 644aca9da..81b016121 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -38,6 +39,8 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -198,10 +201,16 @@ public abstract class AbstractHoodieWriteClient e * Get HoodieTable and init {@link Timer.Context}. * * @param operationType write operation type + * @param instantTime current inflight instant time * @return HoodieTable */ - protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) { + protected HoodieTable getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); + UpgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, jsc, instantTime); + return getTableAndInitCtx(metaClient, operationType); + } + + private HoodieTable getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { if (operationType == WriteOperationType.DELETE) { setWriteSchemaForDeletes(metaClient); } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 2a1520a77..9f6df7bdf 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -156,7 +156,7 @@ public class HoodieWriteClient extends AbstractHo if (rollbackPending) { rollBackInflightBootstrap(); } - HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS); table.bootstrap(jsc, extraMetadata); } @@ -186,7 +186,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsert(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -207,7 +207,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT_PREPPED); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -226,7 +226,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insert(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -246,7 +246,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED); + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT_PREPPED); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -285,8 +285,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsert(JavaRDD> records, final String instantTime, - Option userDefinedBulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT); + Option userDefinedBulkInsertPartitioner) { + HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -311,8 +311,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String instantTime, - Option bulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED); + Option bulkInsertPartitioner) { + HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT_PREPPED); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -329,7 +329,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD delete(JavaRDD keys, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.DELETE); + HoodieTable table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); setOperationType(WriteOperationType.DELETE); HoodieWriteMetadata result = table.delete(jsc,instantTime, keys); return postWrite(result, instantTime, table); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 268069080..ce2c8ca10 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.BaseActionExecutor; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -51,6 +52,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor execute(HoodieInstant instantToRollback); } @@ -60,23 +62,23 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor table, - String instantTime, - HoodieInstant instantToRollback, - boolean deleteInstants) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant instantToRollback, + boolean deleteInstants) { this(jsc, config, table, instantTime, instantToRollback, deleteInstants, false, config.shouldRollbackUsingMarkers()); } public BaseRollbackActionExecutor(JavaSparkContext jsc, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant instantToRollback, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant instantToRollback, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { super(jsc, config, table, instantTime); this.instantToRollback = instantToRollback; this.deleteInstants = deleteInstants; @@ -84,7 +86,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List rollbackRequests = generateRollbackRequestsByListing(); + List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(jsc, instantToRollback, rollbackRequests); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index d7304fa1c..3c94df489 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; @@ -68,34 +69,48 @@ public class ListingBasedRollbackHelper implements Serializable { * Performs all rollback actions that we have collected in parallel. */ public List performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List rollbackRequests) { - SerializablePathFilter filter = (path) -> { - if (path.toString().endsWith(this.metaClient.getTableConfig().getBaseFileFormat().getFileExtension())) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return instantToRollback.getTimestamp().equals(fileCommitTime); - } else if (FSUtils.isLogFile(path)) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return instantToRollback.getTimestamp().equals(fileCommitTime); - } - return false; - }; - int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); jsc.setJobGroup(this.getClass().getSimpleName(), "Perform rollback actions"); + JavaPairRDD partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(jsc, instantToRollback, rollbackRequests, sparkPartitions, true); + return partitionPathRollbackStatsPairRDD.reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect(); + } + + /** + * Collect all file info that needs to be rollbacked. + */ + public List collectRollbackStats(JavaSparkContext jsc, HoodieInstant instantToRollback, List rollbackRequests) { + int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); + jsc.setJobGroup(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); + JavaPairRDD partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(jsc, instantToRollback, rollbackRequests, sparkPartitions, false); + return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect(); + } + + /** + * May be delete interested files and collect stats or collect stats only. + * + * @param jsc instance of {@link JavaSparkContext} to use. + * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. + * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. + * @param sparkPartitions number of spark partitions to use for parallelism. + * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes. + * @return stats collected with or w/o actual deletions. + */ + JavaPairRDD maybeDeleteAndCollectStats(JavaSparkContext jsc, HoodieInstant instantToRollback, List rollbackRequests, + int sparkPartitions, boolean doDelete) { return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> { switch (rollbackRequest.getType()) { case DELETE_DATA_FILES_ONLY: { - final Map filesToDeletedStatus = deleteCleanedFiles(metaClient, config, instantToRollback.getTimestamp(), - rollbackRequest.getPartitionPath()); + final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), + rollbackRequest.getPartitionPath(), doDelete); return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); } case DELETE_DATA_AND_LOG_FILES: { - final Map filesToDeletedStatus = deleteCleanedFiles(metaClient, config, rollbackRequest.getPartitionPath(), filter); + final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); } case APPEND_ROLLBACK_BLOCK: { Writer writer = null; @@ -107,9 +122,11 @@ public class ListingBasedRollbackHelper implements Serializable { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); // generate metadata - Map header = generateHeader(instantToRollback.getTimestamp()); - // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); + if (doDelete) { + Map header = generateHeader(instantToRollback.getTimestamp()); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock(header)); + } } catch (IOException | InterruptedException io) { throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); } finally { @@ -130,30 +147,46 @@ public class ListingBasedRollbackHelper implements Serializable { 1L ); return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); } default: throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); } - }).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect(); + }); } - /** * Common method used for cleaning out base files under a partition path during rollback of a set of commits. */ - private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String partitionPath, PathFilter filter) throws IOException { + private Map deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + String commit, String partitionPath, boolean doDelete) throws IOException { LOG.info("Cleaning path " + partitionPath); + String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + SerializablePathFilter filter = (path) -> { + if (path.toString().endsWith(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } else if (FSUtils.isLogFile(path)) { + // Since the baseCommitTime is the only commit for new log files, it's okay here + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); + return commit.equals(fileCommitTime); + } + return false; + }; + final Map results = new HashMap<>(); FileSystem fs = metaClient.getFs(); FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); + if (doDelete) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + LOG.info("Delete file " + file.getPath() + "\t" + success); + } else { + results.put(file, true); + } } return results; } @@ -161,8 +194,8 @@ public class ListingBasedRollbackHelper implements Serializable { /** * Common method used for cleaning out base files under a partition path during rollback of a set of commits. */ - private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String commit, String partitionPath) throws IOException { + private Map deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + String commit, String partitionPath, boolean doDelete) throws IOException { final Map results = new HashMap<>(); LOG.info("Cleaning path " + partitionPath); FileSystem fs = metaClient.getFs(); @@ -176,9 +209,13 @@ public class ListingBasedRollbackHelper implements Serializable { }; FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); + if (doDelete) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + LOG.info("Delete file " + file.getPath() + "\t" + success); + } else { + results.put(file, true); + } } return results; } @@ -194,5 +231,6 @@ public class ListingBasedRollbackHelper implements Serializable { } public interface SerializablePathFilter extends PathFilter, Serializable { + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java index c29cefaad..97e110dc0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -19,18 +19,12 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.HoodieTimer; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -38,31 +32,28 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecutor { private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class); public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { super(jsc, config, table, instantTime, commitInstant, deleteInstants); } public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); } @@ -90,7 +81,6 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto // NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is // required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks. - // For Requested State (like failure during index lookup), there is nothing to do rollback other than // deleting the timeline file if (!resolvedInstant.isRequested()) { @@ -110,144 +100,10 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto protected List executeRollbackUsingFileListing(HoodieInstant resolvedInstant) { List rollbackRequests; try { - rollbackRequests = generateRollbackRequestsUsingFileListing(resolvedInstant); + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, jsc); } catch (IOException e) { throw new HoodieIOException("Error generating rollback requests by file listing.", e); } return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests); } - - /** - * Generate all rollback requests that we need to perform for rolling back this action without actually performing - * rolling back. - * - * @param instantToRollback Instant to Rollback - * @return list of rollback requests - */ - private List generateRollbackRequestsUsingFileListing(HoodieInstant instantToRollback) throws IOException { - String commit = instantToRollback.getTimestamp(); - List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()); - int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); - jsc.setJobGroup(this.getClass().getSimpleName(), "Generate all rollback requests"); - return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { - HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); - List partitionRollbackRequests = new ArrayList<>(); - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - LOG.info("Rolling back commit action."); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - break; - case HoodieTimeline.COMPACTION_ACTION: - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of the - // succeeding deltacommit. - boolean higherDeltaCommits = - !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created parquet files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath)); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" - + " log files"); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - } - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getMetaClient().getCommitTimeline() - .getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) - .get(), - HoodieCommitMetadata.class); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); - } - break; - } catch (IOException io) { - throw new HoodieIOException("Failed to collect rollback actions for commit " + commit, io); - } - default: - break; - } - return partitionRollbackRequests.iterator(); - }).filter(Objects::nonNull).collect(); - } - - private List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, - HoodieCommitMetadata commitMetadata) { - ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); - - // wStat.getPrevCommit() might not give the right commit time in the following - // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be - // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. - // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the - // baseCommit always by listing the file slice - Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath) - .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); - return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { - - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) - && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); - - if (validForRollback) { - // For sanity, log instant time can never be less than base-commit on which we are rolling back - ValidationUtils - .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), - HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp())); - } - - return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option - // to delete and we should not step on it - wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); - }).map(wStat -> { - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), - baseCommitTime); - }).collect(Collectors.toList()); - } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index cbb5a2c82..3bfd645e6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -19,19 +19,39 @@ package org.apache.hudi.table.action.rollback; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; public class RollbackUtils { + private static final Logger LOG = LogManager.getLogger(RollbackUtils.class); + static Map generateHeader(String instantToRollback, String rollbackInstantTime) { // generate metadata Map header = new HashMap<>(3); @@ -54,6 +74,7 @@ public class RollbackUtils { final List successDeleteFiles = new ArrayList<>(); final List failedDeleteFiles = new ArrayList<>(); final Map commandBlocksCount = new HashMap<>(); + final List filesToRollback = new ArrayList<>(); Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll); Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll); Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll); @@ -63,4 +84,156 @@ public class RollbackUtils { return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); } + /** + * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type. + * @param fs instance of {@link FileSystem} to use. + * @param basePath base path of interest. + * @param shouldAssumeDatePartitioning {@code true} if date partitioning should be assumed. {@code false} otherwise. + * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected. + */ + public static List generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean shouldAssumeDatePartitioning) { + try { + return FSUtils.getAllPartitionPaths(fs, basePath, shouldAssumeDatePartitioning).stream() + .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new HoodieIOException("Error generating rollback requests", e); + } + } + + /** + * Generate all rollback requests that we need to perform for rolling back this action without actually performing rolling back for MOR table type. + * + * @param instantToRollback Instant to Rollback + * @param table instance of {@link HoodieTable} to use. + * @param jsc instance of {@link JavaSparkContext} to use. + * @return list of rollback requests + */ + public static List generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, JavaSparkContext jsc) throws IOException { + String commit = instantToRollback.getTimestamp(); + HoodieWriteConfig config = table.getConfig(); + List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); + int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); + jsc.setJobGroup(RollbackUtils.class.getSimpleName(), "Generate all rollback requests"); + return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { + HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); + List partitionRollbackRequests = new ArrayList<>(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + LOG.info("Rolling back commit action."); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = + !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath)); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + + " log files"); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + } + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline() + .getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) + .get(), + HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); + } + break; + } catch (IOException io) { + throw new HoodieIOException("Failed to collect rollback actions for commit " + commit, io); + } + default: + break; + } + return partitionRollbackRequests.iterator(); + }).filter(Objects::nonNull).collect(); + } + + private static List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, + HoodieCommitMetadata commitMetadata, HoodieTable table) { + ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + + // wStat.getPrevCommit() might not give the right commit time in the following + // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be + // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. + // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the + // baseCommit always by listing the file slice + Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath) + .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); + return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { + + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) + && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + + if (validForRollback) { + // For sanity, log instant time can never be less than base-commit on which we are rolling back + ValidationUtils + .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), + HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp())); + } + + return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( + // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option + // to delete and we should not step on it + wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); + }).map(wStat -> { + String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); + return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), + baseCommitTime); + }).collect(Collectors.toList()); + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java new file mode 100644 index 000000000..948e44c34 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Interface to assist in downgrading Hoodie table. + */ +public interface DowngradeHandler { + + /** + * to be invoked to downgrade hoodie table from one version to a lower version. + * + * @param config instance of {@link HoodieWriteConfig} to be used. + * @param jsc instance of {@link JavaSparkContext} to be used. + * @param instantTime current instant time that should not touched. + */ + void downgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime); +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/HoodieUpgradeDowngradeException.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/HoodieUpgradeDowngradeException.java new file mode 100644 index 000000000..c7b45eabc --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/HoodieUpgradeDowngradeException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieUpgradeDowngradeException extends HoodieException { + + public HoodieUpgradeDowngradeException(String msg, Throwable t) { + super(msg, t); + } + + public HoodieUpgradeDowngradeException(int fromVersion, int toVersion, boolean upgrade) { + super(String.format("Cannot %s from version %s -> %s", upgrade ? "upgrade" : "downgrade", fromVersion, toVersion), null); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java new file mode 100644 index 000000000..108bdbbf0 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; + +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Downgrade handle to assist in downgrading hoodie table from version 1 to 0. + */ +public class OneToZeroDowngradeHandler implements DowngradeHandler { + + @Override + public void downgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime) { + // fetch pending commit info + HoodieTable table = HoodieTable.create(config, jsc.hadoopConfiguration()); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList()); + for (HoodieInstant commitInstant : commits) { + // delete existing marker files + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + markerFiles.quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism()); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java new file mode 100644 index 000000000..b72c42d48 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.Date; +import java.util.Properties; + +/** + * Helper class to assist in upgrading/downgrading Hoodie when there is a version change. + */ +public class UpgradeDowngrade { + + private static final Logger LOG = LogManager.getLogger(UpgradeDowngrade.class); + public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated"; + + private HoodieTableMetaClient metaClient; + private HoodieWriteConfig config; + private JavaSparkContext jsc; + private transient FileSystem fs; + private Path updatedPropsFilePath; + private Path propsFilePath; + + /** + * Perform Upgrade or Downgrade steps if required and updated table version if need be. + *

+ * Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths. + * + * Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), and Hoodie version was upgraded to 0.6.0, + * Hoodie table version gets bumped to 1 and there are some upgrade steps need to be executed before doing any writes. + * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie table version 1 and then hoodie was downgraded + * to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need to be executed before proceeding w/ any writes. + * + * On a high level, these are the steps performed + * + * Step1 : Understand current hoodie table version and table version from hoodie.properties file + * Step2 : Delete any left over .updated from previous upgrade/downgrade + * Step3 : If version are different, perform upgrade/downgrade. + * Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated + * Step6 : Rename hoodie.properties.updated to hoodie.properties + *

+ * + * @param metaClient instance of {@link HoodieTableMetaClient} to use + * @param toVersion version to which upgrade or downgrade has to be done. + * @param config instance of {@link HoodieWriteConfig} to use. + * @param jsc instance of {@link JavaSparkContext} to use. + * @param instantTime current instant time that should not be touched. + */ + public static void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config, + JavaSparkContext jsc, String instantTime) { + try { + new UpgradeDowngrade(metaClient, config, jsc).run(toVersion, instantTime); + } catch (IOException e) { + throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e); + } + } + + private UpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, JavaSparkContext jsc) { + this.metaClient = metaClient; + this.config = config; + this.jsc = jsc; + this.fs = metaClient.getFs(); + this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE); + this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); + } + + private void run(HoodieTableVersion toVersion, String instantTime) throws IOException { + // Fetch version from property file and current version + HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion(); + if (toVersion.versionCode() == fromVersion.versionCode()) { + return; + } + + if (fs.exists(updatedPropsFilePath)) { + // this can be left over .updated file from a failed attempt before. Many cases exist here. + // a) We failed while writing the .updated file and it's content is partial (e.g hdfs) + // b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway + // c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file + // All cases, it simply suffices to delete the file and proceed. + LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath))); + fs.delete(updatedPropsFilePath, false); + } + + // Perform the actual upgrade/downgrade; this has to be idempotent, for now. + LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion); + if (fromVersion.versionCode() < toVersion.versionCode()) { + // upgrade + upgrade(fromVersion, toVersion, instantTime); + } else { + // downgrade + downgrade(fromVersion, toVersion, instantTime); + } + + // Write out the current version in hoodie.properties.updated file + metaClient.getTableConfig().setTableVersion(toVersion); + createUpdatedFile(metaClient.getTableConfig().getProperties()); + + // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores. + // But as long as this does not leave a partial hoodie.properties file, we are okay. + fs.rename(updatedPropsFilePath, propsFilePath); + } + + private void createUpdatedFile(Properties props) throws IOException { + try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) { + props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); + } + } + + private void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { + new ZeroToOneUpgradeHandler().upgrade(config, jsc, instantTime); + } else { + throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); + } + } + + private void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { + new OneToZeroDowngradeHandler().downgrade(config, jsc, instantTime); + } else { + throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java new file mode 100644 index 000000000..4d56143a7 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Interface to assist in upgrading Hoodie table. + */ +public interface UpgradeHandler { + + /** + * to be invoked to upgrade hoodie table from one version to a higher version. + * + * @param config instance of {@link HoodieWriteConfig} to be used. + * @param jsc instance of {@link JavaSparkContext} to be used. + * @param instantTime current instant time that should not be touched. + */ + void upgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime); +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java new file mode 100644 index 000000000..4960ff527 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.io.IOType; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; +import org.apache.hudi.table.action.rollback.RollbackUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Upgrade handle to assist in upgrading hoodie table from version 0 to 1. + */ +public class ZeroToOneUpgradeHandler implements UpgradeHandler { + + @Override + public void upgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime) { + // fetch pending commit info + HoodieTable table = HoodieTable.create(config, jsc.hadoopConfiguration()); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + if (commits.size() > 0 && instantTime != null) { + // ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch + commits.remove(instantTime); + } + for (String commit : commits) { + // for every pending commit, delete old marker files and re-create marker files in new format + recreateMarkerFiles(commit, table, jsc, config.getMarkersDeleteParallelism()); + } + } + + /** + * Recreate marker files in new format. + * Step1: Delete existing marker files + * Step2: Collect all rollback file info. + * Step3: recreate marker files for all interested files. + * + * @param commitInstantTime instant of interest for which marker files need to be recreated. + * @param table instance of {@link HoodieTable} to use + * @param jsc instance of {@link JavaSparkContext} to use + * @throws HoodieRollbackException on any exception during upgrade. + */ + private static void recreateMarkerFiles(final String commitInstantTime, HoodieTable table, JavaSparkContext jsc, int parallelism) throws HoodieRollbackException { + try { + // fetch hoodie instant + Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) + .findFirst()); + if (commitInstantOpt.isPresent()) { + // delete existing marker files + MarkerFiles markerFiles = new MarkerFiles(table, commitInstantTime); + markerFiles.quietDeleteMarkerDir(jsc, parallelism); + + // generate rollback stats + List rollbackRequests; + if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + table.getConfig().shouldAssumeDatePartitioning()); + } else { + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, jsc); + } + List rollbackStats = new ListingBasedRollbackHelper(table.getMetaClient(), table.getConfig()) + .collectRollbackStats(jsc, commitInstantOpt.get(), rollbackRequests); + + // recreate marker files adhering to marker based rollback + for (HoodieRollbackStat rollbackStat : rollbackStats) { + for (String path : rollbackStat.getSuccessDeleteFiles()) { + String dataFileName = path.substring(path.lastIndexOf("/") + 1); + // not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files. + markerFiles.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE); + } + for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) { + markerFiles.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND); + } + } + } + } catch (Exception e) { + throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e); + } + } + + /** + * Curates file name for marker from existing log file path. + * log file format : partitionpath/.fileid_baseInstant.log.writetoken + * marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND + * + * @param logFilePath log file path for which marker file name needs to be generated. + * @return the marker file name thus curated. + */ + private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { + Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath); + String fileId = FSUtils.getFileIdFromLogPath(logPath); + String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); + String writeToken = FSUtils.getWriteTokenFromLogPath(logPath); + + return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension()); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java new file mode 100644 index 000000000..f40bd56e0 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; +import org.apache.hudi.testutils.Assertions; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests {@link UpgradeDowngrade}. + */ +public class TestUpgradeDowngrade extends HoodieClientTestBase { + + private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1}"; + + public static Stream configParams() { + Object[][] data = new Object[][] { + {true, HoodieTableType.COPY_ON_WRITE}, {false, HoodieTableType.COPY_ON_WRITE}, + {true, HoodieTableType.MERGE_ON_READ}, {false, HoodieTableType.MERGE_ON_READ} + }; + return Stream.of(data).map(Arguments::of); + } + + @Test + public void testLeftOverUpdatedPropFileCleanup() throws IOException { + testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testUpgrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + testUpgradeInternal(false, deletePartialMarkerFiles, tableType); + } + + public void testUpgradeInternal(boolean induceResiduesFromPrevUpgrade, boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + // init config, table and client. + Map params = new HashMap<>(); + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + } + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); + HoodieWriteClient client = getHoodieWriteClient(cfg); + + // prepare data. Make 2 commits, in which 2nd is not committed. + List firstPartitionCommit2FileSlices = new ArrayList<>(); + List secondPartitionCommit2FileSlices = new ArrayList<>(); + Pair, List> inputRecords = twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, client, false); + + HoodieTable table = this.getHoodieTable(metaClient, cfg); + HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get(); + + // delete one of the marker files in 2nd commit if need be. + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + List markerPaths = markerFiles.allMarkerFilePaths(); + if (deletePartialMarkerFiles) { + String toDeleteMarkerFile = markerPaths.get(0); + table.getMetaClient().getFs().delete(new Path(table.getMetaClient().getTempFolderPath() + "/" + commitInstant.getTimestamp() + "/" + toDeleteMarkerFile)); + markerPaths.remove(toDeleteMarkerFile); + } + + // set hoodie.table.version to 0 in hoodie.properties file + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ZERO); + + if (induceResiduesFromPrevUpgrade) { + createResidualFile(); + } + + // should re-create marker files for 2nd commit since its pending. + UpgradeDowngrade.run(metaClient, HoodieTableVersion.ONE, cfg, jsc, null); + + // assert marker files + assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices); + + // verify hoodie.table.version got upgraded + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ONE.versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.ONE); + + // trigger 3rd commit with marker based rollback enabled. + List thirdBatch = triggerCommit("003", tableType, true); + + // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. + assertRows(inputRecords.getKey(), thirdBatch); + if (induceResiduesFromPrevUpgrade) { + assertFalse(fs.exists(new Path(metaClient.getMetaPath(), UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE))); + } + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + // init config, table and client. + Map params = new HashMap<>(); + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + } + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); + HoodieWriteClient client = getHoodieWriteClient(cfg); + + // prepare data. Make 2 commits, in which 2nd is not committed. + List firstPartitionCommit2FileSlices = new ArrayList<>(); + List secondPartitionCommit2FileSlices = new ArrayList<>(); + Pair, List> inputRecords = twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, client, false); + + HoodieTable table = this.getHoodieTable(metaClient, cfg); + HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get(); + + // delete one of the marker files in 2nd commit if need be. + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + List markerPaths = markerFiles.allMarkerFilePaths(); + if (deletePartialMarkerFiles) { + String toDeleteMarkerFile = markerPaths.get(0); + table.getMetaClient().getFs().delete(new Path(table.getMetaClient().getTempFolderPath() + "/" + commitInstant.getTimestamp() + "/" + toDeleteMarkerFile)); + markerPaths.remove(toDeleteMarkerFile); + } + + // set hoodie.table.version to 1 in hoodie.properties file + prepForDowngrade(); + + // downgrade should be performed. all marker files should be deleted + UpgradeDowngrade.run(metaClient, HoodieTableVersion.ZERO, cfg, jsc, null); + + // assert marker files + assertMarkerFilesForDowngrade(table, commitInstant); + + // verify hoodie.table.version got downgraded + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.ZERO); + + // trigger 3rd commit with marker based rollback disabled. + List thirdBatch = triggerCommit("003", tableType, false); + + // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. + assertRows(inputRecords.getKey(), thirdBatch); + } + + private void assertMarkerFilesForDowngrade(HoodieTable table, HoodieInstant commitInstant) throws IOException { + // Verify recreated marker files are as expected + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + assertFalse(markerFiles.doesMarkerDirExist()); + } + + private void assertMarkerFilesForUpgrade(HoodieTable table, HoodieInstant commitInstant, List firstPartitionCommit2FileSlices, + List secondPartitionCommit2FileSlices) throws IOException { + // Verify recreated marker files are as expected + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + assertTrue(markerFiles.doesMarkerDirExist()); + List files = markerFiles.allMarkerFilePaths(); + + assertEquals(2, files.size()); + List actualFiles = new ArrayList<>(); + for (String file : files) { + String fileName = MarkerFiles.stripMarkerSuffix(file); + actualFiles.add(fileName); + } + + List expectedFileSlices = new ArrayList<>(); + expectedFileSlices.addAll(firstPartitionCommit2FileSlices); + expectedFileSlices.addAll(secondPartitionCommit2FileSlices); + + List expectedPaths = new ArrayList<>(); + List> expectedLogFilePaths = new ArrayList<>(); + + for (FileSlice fileSlice : expectedFileSlices) { + String partitionPath = fileSlice.getPartitionPath(); + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + for (HoodieLogFile logFile : fileSlice.getLogFiles().collect(Collectors.toList())) { + // log file format can't be matched as is, since the write token can't be asserted. Hence asserting for partitionpath, fileId and baseCommit time. + String logBaseCommitTime = logFile.getBaseCommitTime(); + expectedLogFilePaths.add(Pair.of(partitionPath + "/" + logFile.getFileId(), logBaseCommitTime)); + } + } + if (fileSlice.getBaseInstantTime().equals(commitInstant.getTimestamp())) { + String path = fileSlice.getBaseFile().get().getPath(); + // for base files, path can be asserted as is. + expectedPaths.add(path.substring(path.indexOf(partitionPath))); + } + } + + // Trim log file paths only + List trimmedActualFiles = new ArrayList<>(); + for (String actualFile : actualFiles) { + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + trimmedActualFiles.add(actualFile.substring(0, actualFile.lastIndexOf('.'))); + } else { + trimmedActualFiles.add(actualFile); + } + } + // assert for base files. + for (String expected : expectedPaths) { + if (trimmedActualFiles.contains(expected)) { + trimmedActualFiles.remove(expected); + } + } + + if (expectedLogFilePaths.size() > 0) { + // assert for log files + List> actualLogFiles = new ArrayList<>(); + for (String actual : trimmedActualFiles) { + actualLogFiles.add(Pair.of(actual.substring(0, actual.indexOf('_')), actual.substring(actual.lastIndexOf('_') + 1))); + } + assertEquals(expectedLogFilePaths.size(), actualLogFiles.size()); + for (Pair entry : expectedLogFilePaths) { + assertTrue(actualLogFiles.contains(entry)); + } + } else { + assertTrue(trimmedActualFiles.size() == 0); + } + } + + private List triggerCommit(String newCommitTime, HoodieTableType tableType, boolean enableMarkedBasedRollback) { + Map params = new HashMap<>(); + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); + } + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(enableMarkedBasedRollback).withProps(params).build(); + HoodieWriteClient client = getHoodieWriteClient(cfg, true); + + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2); + JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD statuses = client.upsert(writeRecords, newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + client.commit(newCommitTime, statuses); + return records; + } + + private void assertRows(List firstBatch, List secondBatch) { + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + Dataset rows = HoodieClientTestUtils.read(jsc, metaClient.getBasePath(), sqlContext, metaClient.getFs(), fullPartitionPaths); + List expectedRecordKeys = new ArrayList<>(); + for (HoodieRecord rec : firstBatch) { + expectedRecordKeys.add(rec.getRecordKey()); + } + + for (HoodieRecord rec : secondBatch) { + expectedRecordKeys.add(rec.getRecordKey()); + } + List rowList = rows.collectAsList(); + assertEquals(expectedRecordKeys.size(), rows.count()); + + List actualRecordKeys = new ArrayList<>(); + for (Row row : rowList) { + actualRecordKeys.add(row.getAs("_row_key")); + } + + for (String expectedRecordKey : expectedRecordKeys) { + assertTrue(actualRecordKeys.contains(expectedRecordKey)); + } + } + + /** + * Create two commits and may or may not commit 2nd commit. + * + * @param firstPartitionCommit2FileSlices list to hold file slices in first partition. + * @param secondPartitionCommit2FileSlices list of hold file slices from second partition. + * @param cfg instance of {@link HoodieWriteConfig} + * @param client instance of {@link HoodieWriteClient} to use. + * @param commitSecondUpsert true if 2nd commit needs to be committed. false otherwise. + * @return a pair of list of records from 1st and 2nd batch. + */ + private Pair, List> twoUpsertCommitDataWithTwoPartitions(List firstPartitionCommit2FileSlices, + List secondPartitionCommit2FileSlices, + HoodieWriteConfig cfg, HoodieWriteClient client, + boolean commitSecondUpsert) throws IOException { + //just generate two partitions + dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); + //1. prepare data + HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2); + JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD statuses = client.upsert(writeRecords, newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + client.commit(newCommitTime, statuses); + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + List records2 = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records2, 1), newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + if (commitSecondUpsert) { + client.commit(newCommitTime, statuses); + } + + //2. assert filegroup and get the first partition fileslice + HoodieTable table = this.getHoodieTable(metaClient, cfg); + SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient()); + List firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, firstPartitionCommit2FileGroups.size()); + firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList())); + //3. assert filegroup and get the second partition fileslice + List secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, secondPartitionCommit2FileGroups.size()); + secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList())); + + //4. assert fileslice + HoodieTableType tableType = metaClient.getTableType(); + if (tableType.equals(HoodieTableType.COPY_ON_WRITE)) { + assertEquals(2, firstPartitionCommit2FileSlices.size()); + assertEquals(2, secondPartitionCommit2FileSlices.size()); + } else { + assertEquals(1, firstPartitionCommit2FileSlices.size()); + assertEquals(1, secondPartitionCommit2FileSlices.size()); + } + return Pair.of(records, records2); + } + + private void prepForDowngrade() throws IOException { + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) { + metaClient.getTableConfig().getProperties().store(os, ""); + } + } + + private void createResidualFile() throws IOException { + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); + + // Step1: Copy hoodie.properties to hoodie.properties.orig + FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile, + false, metaClient.getHadoopConf()); + } + + private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException { + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + // Load the properties and verify + FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile); + Properties prop = new Properties(); + prop.load(fsDataInputStream); + fsDataInputStream.close(); + assertEquals(Integer.toString(expectedVersion.versionCode()), prop.getProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME)); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 2526d953e..6db6529f2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -309,4 +309,29 @@ public class HoodieClientTestUtils { f.createNewFile(); return f.getAbsolutePath(); } + + public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException { + createTempFolderForMarkerFiles(basePath); + String folderPath = getTempFolderName(basePath); + // create dir for this instant + new File(folderPath + "/" + instantTime + "/" + partitionPath).mkdirs(); + new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile(); + } + + public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) { + String folderPath = getTempFolderName(basePath); + File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath); + if (markerDir.exists()) { + return markerDir.listFiles((dir, name) -> name.contains(".marker.MERGE")).length; + } + return 0; + } + + public static void createTempFolderForMarkerFiles(String basePath) { + new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs(); + } + + public static String getTempFolderName(String basePath) { + return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index f188f3a86..7e2dffbfe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; @@ -41,9 +42,8 @@ import java.util.Properties; import java.util.stream.Collectors; /** - * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are - * loaded from hoodie.properties, these properties are usually set during initializing a path as hoodie base path and - * never changes during the lifetime of a hoodie table. + * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during + * initializing a path as hoodie base path and never changes during the lifetime of a hoodie table. * * @see HoodieTableMetaClient * @since 0.3.0 @@ -55,6 +55,7 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type"; + public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version"; @Deprecated public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format"; @Deprecated @@ -68,13 +69,13 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_BOOTSTRAP_BASE_PATH = "hoodie.bootstrap.base.path"; public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; + public static final HoodieTableVersion DEFAULT_TABLE_VERSION = HoodieTableVersion.ZERO; public static final HoodieFileFormat DEFAULT_BASE_FILE_FORMAT = HoodieFileFormat.PARQUET; public static final HoodieFileFormat DEFAULT_LOG_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG; public static final String DEFAULT_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName(); public static final String DEFAULT_BOOTSTRAP_INDEX_CLASS = HFileBootstrapIndex.class.getName(); - - public static final Integer DEFAULT_TIMELINE_LAYOUT_VERSION = TimelineLayoutVersion.VERSION_0; public static final String DEFAULT_ARCHIVELOG_FOLDER = ""; + private Properties props; public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { @@ -96,6 +97,8 @@ public class HoodieTableConfig implements Serializable { throw new HoodieIOException("Could not load Hoodie properties from " + propertyPath, e); } this.props = props; + ValidationUtils.checkArgument(props.containsKey(HOODIE_TABLE_TYPE_PROP_NAME) && props.containsKey(HOODIE_TABLE_NAME_PROP_NAME), + "hoodie.properties file seems invalid. Please check for left over `.updated` files if any, manually copy it to hoodie.properties and retry"); } public HoodieTableConfig(Properties props) { @@ -107,7 +110,8 @@ public class HoodieTableConfig implements Serializable { * * @deprecated */ - public HoodieTableConfig() {} + public HoodieTableConfig() { + } /** * Initialize the hoodie meta directory and any necessary files inside the meta (including the hoodie.properties). @@ -160,6 +164,19 @@ public class HoodieTableConfig implements Serializable { : Option.empty(); } + /** + * @return the hoodie.table.version from hoodie.properties file. + */ + public HoodieTableVersion getTableVersion() { + return props.containsKey(HOODIE_TABLE_VERSION_PROP_NAME) + ? HoodieTableVersion.versionFromCode(Integer.parseInt(props.getProperty(HOODIE_TABLE_VERSION_PROP_NAME))) + : DEFAULT_TABLE_VERSION; + } + + public void setTableVersion(HoodieTableVersion tableVersion) { + props.put(HOODIE_TABLE_VERSION_PROP_NAME, Integer.toString(tableVersion.versionCode())); + } + /** * Read the payload class for HoodieRecords from the table properties. */ @@ -231,4 +248,8 @@ public class HoodieTableConfig implements Serializable { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); } + + public Properties getProperties() { + return props; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index a74728eb5..42fb3e9df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -114,7 +114,8 @@ public class HoodieTableMetaClient implements Serializable { } public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, - ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, String payloadClassName) { + ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, + String payloadClassName) { LOG.info("Loading HoodieTableMetaClient from " + basePath); this.basePath = basePath; this.consistencyGuardConfig = consistencyGuardConfig; @@ -362,6 +363,7 @@ public class HoodieTableMetaClient implements Serializable { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME, String.valueOf(HoodieTableVersion.current().versionCode())); if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != null) { properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName); } @@ -614,8 +616,4 @@ public class HoodieTableMetaClient implements Serializable { public void setActiveTimeline(HoodieActiveTimeline activeTimeline) { this.activeTimeline = activeTimeline; } - - public void setTableConfig(HoodieTableConfig tableConfig) { - this.tableConfig = tableConfig; - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java new file mode 100644 index 000000000..eb2e200de --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.exception.HoodieException; + +import java.util.Arrays; + +/** + * Table's version that controls what version of writer/readers can actually read/write + * to a given table. + */ +public enum HoodieTableVersion { + // < 0.6.0 versions + ZERO(0), + // 0.6.0 onwards + ONE(1); + + private final int versionCode; + + HoodieTableVersion(int versionCode) { + this.versionCode = versionCode; + } + + public int versionCode() { + return versionCode; + } + + public static HoodieTableVersion current() { + return ONE; + } + + static HoodieTableVersion versionFromCode(int versionCode) { + return Arrays.stream(HoodieTableVersion.values()) + .filter(v -> v.versionCode == versionCode).findAny() + .orElseThrow(() -> new HoodieException("Unknown versionCode:" + versionCode)); + } +}