diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 5f9509840..e6a3f529c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -23,7 +23,10 @@ import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.client.utils.ClientUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieBootstrapConfig; @@ -31,6 +34,7 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter.Config; @@ -64,7 +68,7 @@ public class SparkMain { */ enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, - COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT + COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE } public static void main(String[] args) throws Exception { @@ -185,6 +189,11 @@ public class SparkMain { returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs); break; + case UPGRADE: + case DOWNGRADE: + assert (args.length == 5); + returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]); + break; default: break; } @@ -380,9 +389,35 @@ public class SparkMain { } } + /** + * Upgrade or downgrade table. + * + * @param jsc instance of {@link JavaSparkContext} to use. + * @param basePath base path of the dataset. + * @param toVersion version to which upgrade/downgrade to be done. + * @return 0 if success, else -1. + * @throws Exception + */ + protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) { + HoodieWriteConfig config = getWriteConfig(basePath); + HoodieTableMetaClient metaClient = ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, false); + try { + UpgradeDowngrade.run(metaClient, HoodieTableVersion.valueOf(toVersion), config, jsc, null); + LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion)); + return 0; + } catch (Exception e) { + LOG.warn(String.format("Failed: Could not upgrade/downgrade table at \"%s\" to version \"%s\".", basePath, toVersion), e); + return -1; + } + } + private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); + HoodieWriteConfig config = getWriteConfig(basePath); return new HoodieWriteClient(jsc, config); } + + private static HoodieWriteConfig getWriteConfig(String basePath) { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); + } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java new file mode 100644 index 000000000..deb9e0727 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/UpgradeOrDowngradeCommand.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.commands.SparkMain.SparkCommand; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import org.apache.spark.launcher.SparkLauncher; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; + +/** + * CLI command to assist in upgrading/downgrading Hoodie dataset to a different version. + */ +public class UpgradeOrDowngradeCommand implements CommandMarker { + + @CliCommand(value = "upgrade hoodie dataset ", help = "Upgrades hoodie dataset") + public String upgradeHoodieDataset( + @CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory) + throws Exception { + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.UPGRADE.toString(), master, sparkMemory, metaClient.getBasePath(), toVersion); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + HoodieCLI.refreshTableMetadata(); + if (exitCode != 0) { + return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); + } + return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); + } + + @CliCommand(value = "downgrade hoodie dataset ", help = "Upgrades hoodie dataset") + public String downgradeHoodieDataset( + @CliOption(key = {"toVersion"}, help = "To version of Hoodie dataset to be upgraded/downgraded to", unspecifiedDefaultValue = "") final String toVersion, + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory) + throws Exception { + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.DOWNGRADE.toString(), master, sparkMemory, metaClient.getBasePath(), toVersion); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + HoodieCLI.refreshTableMetadata(); + if (exitCode != 0) { + return String.format("Failed: Could not Upgrade/Downgrade Hoodie dataset to \"%s\".", toVersion); + } + return String.format("Hoodie dataset upgraded/downgraded to ", toVersion); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index afad53b93..ed14a64fc 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -167,10 +167,10 @@ public class TestRepairsCommand extends AbstractShellIntegrationTest { assertEquals(expected, result); // check result - List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", + List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", "hoodie.table.version", "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key, - oldProps.getOrDefault(key, null), result.getOrDefault(key, null)}) + oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")}) .toArray(String[][]::new); String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY, HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java new file mode 100644 index 000000000..4c0479a58 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link UpgradeOrDowngradeCommand}. + */ +public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest { + + private String tablePath; + + @BeforeEach + public void init() throws IOException { + String tableName = "test_table"; + tablePath = basePath + File.separator + tableName; + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + //Create some commits files and parquet files + String commitTime1 = "100"; + String commitTime2 = "101"; + HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath); + + // one commit file + HoodieTestUtils.createCommitFiles(tablePath, commitTime1); + // one .inflight commit file + HoodieTestUtils.createInflightCommitFiles(tablePath, commitTime2); + + // generate commit files for commit 100 + for (String commitTime : Arrays.asList(commitTime1)) { + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + } + + // generate commit and marker files for inflight commit 101 + for (String commitTime : Arrays.asList(commitTime2)) { + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + } + } + + @Test + public void testDowngradeCommand() throws Exception { + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + // update hoodie.table.version to 1 + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); + try (FSDataOutputStream os = metaClient.getFs().create(new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE), true)) { + metaClient.getTableConfig().getProperties().store(os, ""); + } + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + // verify marker files for inflight commit exists + for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) { + assertEquals(1, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101")); + } + + SparkMain.upgradeOrDowngradeTable(jsc, tablePath, HoodieTableVersion.ZERO.name()); + + // verify hoodie.table.version got downgraded + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + // verify hoodie.table.version + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode()); + assertTableVersionFromPropertyFile(); + + // verify marker files are non existant + for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) { + assertEquals(0, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101")); + } + } + + private void assertTableVersionFromPropertyFile() throws IOException { + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + // Load the properties and verify + FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile); + Properties prop = new Properties(); + prop.load(fsDataInputStream); + fsDataInputStream.close(); + assertEquals(Integer.toString(HoodieTableVersion.ZERO.versionCode()), prop.getProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME)); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 644aca9da..81b016121 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -38,6 +39,8 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -198,10 +201,16 @@ public abstract class AbstractHoodieWriteClient e * Get HoodieTable and init {@link Timer.Context}. * * @param operationType write operation type + * @param instantTime current inflight instant time * @return HoodieTable */ - protected HoodieTable getTableAndInitCtx(WriteOperationType operationType) { + protected HoodieTable getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); + UpgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, jsc, instantTime); + return getTableAndInitCtx(metaClient, operationType); + } + + private HoodieTable getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { if (operationType == WriteOperationType.DELETE) { setWriteSchemaForDeletes(metaClient); } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 2a1520a77..9f6df7bdf 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -156,7 +156,7 @@ public class HoodieWriteClient extends AbstractHo if (rollbackPending) { rollBackInflightBootstrap(); } - HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS); table.bootstrap(jsc, extraMetadata); } @@ -186,7 +186,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsert(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -207,7 +207,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED); + HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); table.validateUpsertSchema(); setOperationType(WriteOperationType.UPSERT_PREPPED); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -226,7 +226,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insert(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT); + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -246,7 +246,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED); + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.INSERT_PREPPED); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -285,8 +285,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsert(JavaRDD> records, final String instantTime, - Option userDefinedBulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT); + Option userDefinedBulkInsertPartitioner) { + HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -311,8 +311,8 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String instantTime, - Option bulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED); + Option bulkInsertPartitioner) { + HoodieTable table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT_PREPPED); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); @@ -329,7 +329,7 @@ public class HoodieWriteClient extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD delete(JavaRDD keys, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.DELETE); + HoodieTable table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); setOperationType(WriteOperationType.DELETE); HoodieWriteMetadata result = table.delete(jsc,instantTime, keys); return postWrite(result, instantTime, table); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 268069080..ce2c8ca10 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.BaseActionExecutor; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -51,6 +52,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor execute(HoodieInstant instantToRollback); } @@ -60,23 +62,23 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor table, - String instantTime, - HoodieInstant instantToRollback, - boolean deleteInstants) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant instantToRollback, + boolean deleteInstants) { this(jsc, config, table, instantTime, instantToRollback, deleteInstants, false, config.shouldRollbackUsingMarkers()); } public BaseRollbackActionExecutor(JavaSparkContext jsc, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant instantToRollback, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant instantToRollback, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { super(jsc, config, table, instantTime); this.instantToRollback = instantToRollback; this.deleteInstants = deleteInstants; @@ -84,7 +86,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor executeRollbackUsingFileListing(HoodieInstant instantToRollback) { - List rollbackRequests = generateRollbackRequestsByListing(); + List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(jsc, instantToRollback, rollbackRequests); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index d7304fa1c..3c94df489 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; @@ -68,34 +69,48 @@ public class ListingBasedRollbackHelper implements Serializable { * Performs all rollback actions that we have collected in parallel. */ public List performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback, List rollbackRequests) { - SerializablePathFilter filter = (path) -> { - if (path.toString().endsWith(this.metaClient.getTableConfig().getBaseFileFormat().getFileExtension())) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return instantToRollback.getTimestamp().equals(fileCommitTime); - } else if (FSUtils.isLogFile(path)) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return instantToRollback.getTimestamp().equals(fileCommitTime); - } - return false; - }; - int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); jsc.setJobGroup(this.getClass().getSimpleName(), "Perform rollback actions"); + JavaPairRDD partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(jsc, instantToRollback, rollbackRequests, sparkPartitions, true); + return partitionPathRollbackStatsPairRDD.reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect(); + } + + /** + * Collect all file info that needs to be rollbacked. + */ + public List collectRollbackStats(JavaSparkContext jsc, HoodieInstant instantToRollback, List rollbackRequests) { + int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); + jsc.setJobGroup(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade"); + JavaPairRDD partitionPathRollbackStatsPairRDD = maybeDeleteAndCollectStats(jsc, instantToRollback, rollbackRequests, sparkPartitions, false); + return partitionPathRollbackStatsPairRDD.map(Tuple2::_2).collect(); + } + + /** + * May be delete interested files and collect stats or collect stats only. + * + * @param jsc instance of {@link JavaSparkContext} to use. + * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. + * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. + * @param sparkPartitions number of spark partitions to use for parallelism. + * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes. + * @return stats collected with or w/o actual deletions. + */ + JavaPairRDD maybeDeleteAndCollectStats(JavaSparkContext jsc, HoodieInstant instantToRollback, List rollbackRequests, + int sparkPartitions, boolean doDelete) { return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> { switch (rollbackRequest.getType()) { case DELETE_DATA_FILES_ONLY: { - final Map filesToDeletedStatus = deleteCleanedFiles(metaClient, config, instantToRollback.getTimestamp(), - rollbackRequest.getPartitionPath()); + final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), + rollbackRequest.getPartitionPath(), doDelete); return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); } case DELETE_DATA_AND_LOG_FILES: { - final Map filesToDeletedStatus = deleteCleanedFiles(metaClient, config, rollbackRequest.getPartitionPath(), filter); + final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withDeletedFileResults(filesToDeletedStatus).build()); + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); } case APPEND_ROLLBACK_BLOCK: { Writer writer = null; @@ -107,9 +122,11 @@ public class ListingBasedRollbackHelper implements Serializable { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); // generate metadata - Map header = generateHeader(instantToRollback.getTimestamp()); - // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); + if (doDelete) { + Map header = generateHeader(instantToRollback.getTimestamp()); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock(header)); + } } catch (IOException | InterruptedException io) { throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); } finally { @@ -130,30 +147,46 @@ public class ListingBasedRollbackHelper implements Serializable { 1L ); return new Tuple2<>(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); } default: throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); } - }).reduceByKey(RollbackUtils::mergeRollbackStat).map(Tuple2::_2).collect(); + }); } - /** * Common method used for cleaning out base files under a partition path during rollback of a set of commits. */ - private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String partitionPath, PathFilter filter) throws IOException { + private Map deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + String commit, String partitionPath, boolean doDelete) throws IOException { LOG.info("Cleaning path " + partitionPath); + String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + SerializablePathFilter filter = (path) -> { + if (path.toString().endsWith(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } else if (FSUtils.isLogFile(path)) { + // Since the baseCommitTime is the only commit for new log files, it's okay here + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); + return commit.equals(fileCommitTime); + } + return false; + }; + final Map results = new HashMap<>(); FileSystem fs = metaClient.getFs(); FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); + if (doDelete) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + LOG.info("Delete file " + file.getPath() + "\t" + success); + } else { + results.put(file, true); + } } return results; } @@ -161,8 +194,8 @@ public class ListingBasedRollbackHelper implements Serializable { /** * Common method used for cleaning out base files under a partition path during rollback of a set of commits. */ - private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String commit, String partitionPath) throws IOException { + private Map deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + String commit, String partitionPath, boolean doDelete) throws IOException { final Map results = new HashMap<>(); LOG.info("Cleaning path " + partitionPath); FileSystem fs = metaClient.getFs(); @@ -176,9 +209,13 @@ public class ListingBasedRollbackHelper implements Serializable { }; FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { - boolean success = fs.delete(file.getPath(), false); - results.put(file, success); - LOG.info("Delete file " + file.getPath() + "\t" + success); + if (doDelete) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + LOG.info("Delete file " + file.getPath() + "\t" + success); + } else { + results.put(file, true); + } } return results; } @@ -194,5 +231,6 @@ public class ListingBasedRollbackHelper implements Serializable { } public interface SerializablePathFilter extends PathFilter, Serializable { + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java index c29cefaad..97e110dc0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -19,18 +19,12 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.HoodieTimer; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -38,31 +32,28 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecutor { private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class); public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { super(jsc, config, table, instantTime, commitInstant, deleteInstants); } public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc, - HoodieWriteConfig config, - HoodieTable table, - String instantTime, - HoodieInstant commitInstant, - boolean deleteInstants, - boolean skipTimelinePublish, - boolean useMarkerBasedStrategy) { + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); } @@ -90,7 +81,6 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto // NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is // required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks. - // For Requested State (like failure during index lookup), there is nothing to do rollback other than // deleting the timeline file if (!resolvedInstant.isRequested()) { @@ -110,144 +100,10 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto protected List executeRollbackUsingFileListing(HoodieInstant resolvedInstant) { List rollbackRequests; try { - rollbackRequests = generateRollbackRequestsUsingFileListing(resolvedInstant); + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, jsc); } catch (IOException e) { throw new HoodieIOException("Error generating rollback requests by file listing.", e); } return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests); } - - /** - * Generate all rollback requests that we need to perform for rolling back this action without actually performing - * rolling back. - * - * @param instantToRollback Instant to Rollback - * @return list of rollback requests - */ - private List generateRollbackRequestsUsingFileListing(HoodieInstant instantToRollback) throws IOException { - String commit = instantToRollback.getTimestamp(); - List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()); - int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); - jsc.setJobGroup(this.getClass().getSimpleName(), "Generate all rollback requests"); - return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { - HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); - List partitionRollbackRequests = new ArrayList<>(); - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - LOG.info("Rolling back commit action."); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - break; - case HoodieTimeline.COMPACTION_ACTION: - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of the - // succeeding deltacommit. - boolean higherDeltaCommits = - !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created parquet files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath)); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" - + " log files"); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - } - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getMetaClient().getCommitTimeline() - .getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) - .get(), - HoodieCommitMetadata.class); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); - } - break; - } catch (IOException io) { - throw new HoodieIOException("Failed to collect rollback actions for commit " + commit, io); - } - default: - break; - } - return partitionRollbackRequests.iterator(); - }).filter(Objects::nonNull).collect(); - } - - private List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, - HoodieCommitMetadata commitMetadata) { - ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); - - // wStat.getPrevCommit() might not give the right commit time in the following - // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be - // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. - // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the - // baseCommit always by listing the file slice - Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath) - .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); - return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { - - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) - && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); - - if (validForRollback) { - // For sanity, log instant time can never be less than base-commit on which we are rolling back - ValidationUtils - .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), - HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp())); - } - - return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option - // to delete and we should not step on it - wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); - }).map(wStat -> { - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), - baseCommitTime); - }).collect(Collectors.toList()); - } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index cbb5a2c82..3bfd645e6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -19,19 +19,39 @@ package org.apache.hudi.table.action.rollback; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; public class RollbackUtils { + private static final Logger LOG = LogManager.getLogger(RollbackUtils.class); + static Map generateHeader(String instantToRollback, String rollbackInstantTime) { // generate metadata Map header = new HashMap<>(3); @@ -54,6 +74,7 @@ public class RollbackUtils { final List successDeleteFiles = new ArrayList<>(); final List failedDeleteFiles = new ArrayList<>(); final Map commandBlocksCount = new HashMap<>(); + final List filesToRollback = new ArrayList<>(); Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll); Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll); Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll); @@ -63,4 +84,156 @@ public class RollbackUtils { return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); } + /** + * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type. + * @param fs instance of {@link FileSystem} to use. + * @param basePath base path of interest. + * @param shouldAssumeDatePartitioning {@code true} if date partitioning should be assumed. {@code false} otherwise. + * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected. + */ + public static List generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean shouldAssumeDatePartitioning) { + try { + return FSUtils.getAllPartitionPaths(fs, basePath, shouldAssumeDatePartitioning).stream() + .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new HoodieIOException("Error generating rollback requests", e); + } + } + + /** + * Generate all rollback requests that we need to perform for rolling back this action without actually performing rolling back for MOR table type. + * + * @param instantToRollback Instant to Rollback + * @param table instance of {@link HoodieTable} to use. + * @param jsc instance of {@link JavaSparkContext} to use. + * @return list of rollback requests + */ + public static List generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, JavaSparkContext jsc) throws IOException { + String commit = instantToRollback.getTimestamp(); + HoodieWriteConfig config = table.getConfig(); + List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); + int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); + jsc.setJobGroup(RollbackUtils.class.getSimpleName(), "Generate all rollback requests"); + return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { + HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); + List partitionRollbackRequests = new ArrayList<>(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + LOG.info("Rolling back commit action."); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = + !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath)); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + + " log files"); + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + } + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline() + .getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) + .get(), + HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); + } + break; + } catch (IOException io) { + throw new HoodieIOException("Failed to collect rollback actions for commit " + commit, io); + } + default: + break; + } + return partitionRollbackRequests.iterator(); + }).filter(Objects::nonNull).collect(); + } + + private static List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, + HoodieCommitMetadata commitMetadata, HoodieTable table) { + ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + + // wStat.getPrevCommit() might not give the right commit time in the following + // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be + // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. + // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the + // baseCommit always by listing the file slice + Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath) + .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); + return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { + + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) + && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + + if (validForRollback) { + // For sanity, log instant time can never be less than base-commit on which we are rolling back + ValidationUtils + .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), + HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp())); + } + + return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( + // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option + // to delete and we should not step on it + wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); + }).map(wStat -> { + String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); + return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), + baseCommitTime); + }).collect(Collectors.toList()); + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java new file mode 100644 index 000000000..948e44c34 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/DowngradeHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Interface to assist in downgrading Hoodie table. + */ +public interface DowngradeHandler { + + /** + * to be invoked to downgrade hoodie table from one version to a lower version. + * + * @param config instance of {@link HoodieWriteConfig} to be used. + * @param jsc instance of {@link JavaSparkContext} to be used. + * @param instantTime current instant time that should not touched. + */ + void downgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime); +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/HoodieUpgradeDowngradeException.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/HoodieUpgradeDowngradeException.java new file mode 100644 index 000000000..c7b45eabc --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/HoodieUpgradeDowngradeException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieUpgradeDowngradeException extends HoodieException { + + public HoodieUpgradeDowngradeException(String msg, Throwable t) { + super(msg, t); + } + + public HoodieUpgradeDowngradeException(int fromVersion, int toVersion, boolean upgrade) { + super(String.format("Cannot %s from version %s -> %s", upgrade ? "upgrade" : "downgrade", fromVersion, toVersion), null); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java new file mode 100644 index 000000000..108bdbbf0 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/OneToZeroDowngradeHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; + +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Downgrade handle to assist in downgrading hoodie table from version 1 to 0. + */ +public class OneToZeroDowngradeHandler implements DowngradeHandler { + + @Override + public void downgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime) { + // fetch pending commit info + HoodieTable table = HoodieTable.create(config, jsc.hadoopConfiguration()); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList()); + for (HoodieInstant commitInstant : commits) { + // delete existing marker files + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + markerFiles.quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism()); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java new file mode 100644 index 000000000..b72c42d48 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.Date; +import java.util.Properties; + +/** + * Helper class to assist in upgrading/downgrading Hoodie when there is a version change. + */ +public class UpgradeDowngrade { + + private static final Logger LOG = LogManager.getLogger(UpgradeDowngrade.class); + public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated"; + + private HoodieTableMetaClient metaClient; + private HoodieWriteConfig config; + private JavaSparkContext jsc; + private transient FileSystem fs; + private Path updatedPropsFilePath; + private Path propsFilePath; + + /** + * Perform Upgrade or Downgrade steps if required and updated table version if need be. + *

+ * Starting from version 0.6.0, this upgrade/downgrade step will be added in all write paths. + * + * Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), and Hoodie version was upgraded to 0.6.0, + * Hoodie table version gets bumped to 1 and there are some upgrade steps need to be executed before doing any writes. + * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie table version 1 and then hoodie was downgraded + * to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need to be executed before proceeding w/ any writes. + * + * On a high level, these are the steps performed + * + * Step1 : Understand current hoodie table version and table version from hoodie.properties file + * Step2 : Delete any left over .updated from previous upgrade/downgrade + * Step3 : If version are different, perform upgrade/downgrade. + * Step4 : Copy hoodie.properties -> hoodie.properties.updated with the version updated + * Step6 : Rename hoodie.properties.updated to hoodie.properties + *

+ * + * @param metaClient instance of {@link HoodieTableMetaClient} to use + * @param toVersion version to which upgrade or downgrade has to be done. + * @param config instance of {@link HoodieWriteConfig} to use. + * @param jsc instance of {@link JavaSparkContext} to use. + * @param instantTime current instant time that should not be touched. + */ + public static void run(HoodieTableMetaClient metaClient, HoodieTableVersion toVersion, HoodieWriteConfig config, + JavaSparkContext jsc, String instantTime) { + try { + new UpgradeDowngrade(metaClient, config, jsc).run(toVersion, instantTime); + } catch (IOException e) { + throw new HoodieUpgradeDowngradeException("Error during upgrade/downgrade to version:" + toVersion, e); + } + } + + private UpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, JavaSparkContext jsc) { + this.metaClient = metaClient; + this.config = config; + this.jsc = jsc; + this.fs = metaClient.getFs(); + this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE); + this.propsFilePath = new Path(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); + } + + private void run(HoodieTableVersion toVersion, String instantTime) throws IOException { + // Fetch version from property file and current version + HoodieTableVersion fromVersion = metaClient.getTableConfig().getTableVersion(); + if (toVersion.versionCode() == fromVersion.versionCode()) { + return; + } + + if (fs.exists(updatedPropsFilePath)) { + // this can be left over .updated file from a failed attempt before. Many cases exist here. + // a) We failed while writing the .updated file and it's content is partial (e.g hdfs) + // b) We failed without renaming the file to hoodie.properties. We will re-attempt everything now anyway + // c) rename() is not atomic in cloud stores. so hoodie.properties is fine, but we failed before deleting the .updated file + // All cases, it simply suffices to delete the file and proceed. + LOG.info("Deleting existing .updated file with content :" + FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath))); + fs.delete(updatedPropsFilePath, false); + } + + // Perform the actual upgrade/downgrade; this has to be idempotent, for now. + LOG.info("Attempting to move table from version " + fromVersion + " to " + toVersion); + if (fromVersion.versionCode() < toVersion.versionCode()) { + // upgrade + upgrade(fromVersion, toVersion, instantTime); + } else { + // downgrade + downgrade(fromVersion, toVersion, instantTime); + } + + // Write out the current version in hoodie.properties.updated file + metaClient.getTableConfig().setTableVersion(toVersion); + createUpdatedFile(metaClient.getTableConfig().getProperties()); + + // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores. + // But as long as this does not leave a partial hoodie.properties file, we are okay. + fs.rename(updatedPropsFilePath, propsFilePath); + } + + private void createUpdatedFile(Properties props) throws IOException { + try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) { + props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); + } + } + + private void upgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + if (fromVersion == HoodieTableVersion.ZERO && toVersion == HoodieTableVersion.ONE) { + new ZeroToOneUpgradeHandler().upgrade(config, jsc, instantTime); + } else { + throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); + } + } + + private void downgrade(HoodieTableVersion fromVersion, HoodieTableVersion toVersion, String instantTime) { + if (fromVersion == HoodieTableVersion.ONE && toVersion == HoodieTableVersion.ZERO) { + new OneToZeroDowngradeHandler().downgrade(config, jsc, instantTime); + } else { + throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java new file mode 100644 index 000000000..4d56143a7 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Interface to assist in upgrading Hoodie table. + */ +public interface UpgradeHandler { + + /** + * to be invoked to upgrade hoodie table from one version to a higher version. + * + * @param config instance of {@link HoodieWriteConfig} to be used. + * @param jsc instance of {@link JavaSparkContext} to be used. + * @param instantTime current instant time that should not be touched. + */ + void upgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime); +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java new file mode 100644 index 000000000..4960ff527 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.io.IOType; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; +import org.apache.hudi.table.action.rollback.RollbackUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Upgrade handle to assist in upgrading hoodie table from version 0 to 1. + */ +public class ZeroToOneUpgradeHandler implements UpgradeHandler { + + @Override + public void upgrade(HoodieWriteConfig config, JavaSparkContext jsc, String instantTime) { + // fetch pending commit info + HoodieTable table = HoodieTable.create(config, jsc.hadoopConfiguration()); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + if (commits.size() > 0 && instantTime != null) { + // ignore the latest inflight commit since a new commit would have been started and we need to fix any pending commits from previous launch + commits.remove(instantTime); + } + for (String commit : commits) { + // for every pending commit, delete old marker files and re-create marker files in new format + recreateMarkerFiles(commit, table, jsc, config.getMarkersDeleteParallelism()); + } + } + + /** + * Recreate marker files in new format. + * Step1: Delete existing marker files + * Step2: Collect all rollback file info. + * Step3: recreate marker files for all interested files. + * + * @param commitInstantTime instant of interest for which marker files need to be recreated. + * @param table instance of {@link HoodieTable} to use + * @param jsc instance of {@link JavaSparkContext} to use + * @throws HoodieRollbackException on any exception during upgrade. + */ + private static void recreateMarkerFiles(final String commitInstantTime, HoodieTable table, JavaSparkContext jsc, int parallelism) throws HoodieRollbackException { + try { + // fetch hoodie instant + Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) + .findFirst()); + if (commitInstantOpt.isPresent()) { + // delete existing marker files + MarkerFiles markerFiles = new MarkerFiles(table, commitInstantTime); + markerFiles.quietDeleteMarkerDir(jsc, parallelism); + + // generate rollback stats + List rollbackRequests; + if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + table.getConfig().shouldAssumeDatePartitioning()); + } else { + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, jsc); + } + List rollbackStats = new ListingBasedRollbackHelper(table.getMetaClient(), table.getConfig()) + .collectRollbackStats(jsc, commitInstantOpt.get(), rollbackRequests); + + // recreate marker files adhering to marker based rollback + for (HoodieRollbackStat rollbackStat : rollbackStats) { + for (String path : rollbackStat.getSuccessDeleteFiles()) { + String dataFileName = path.substring(path.lastIndexOf("/") + 1); + // not feasible to differentiate MERGE from CREATE. hence creating with MERGE IOType for all base files. + markerFiles.create(rollbackStat.getPartitionPath(), dataFileName, IOType.MERGE); + } + for (FileStatus fileStatus : rollbackStat.getCommandBlocksCount().keySet()) { + markerFiles.create(rollbackStat.getPartitionPath(), getFileNameForMarkerFromLogFile(fileStatus.getPath().toString(), table), IOType.APPEND); + } + } + } + } catch (Exception e) { + throw new HoodieRollbackException("Exception thrown while upgrading Hoodie Table from version 0 to 1", e); + } + } + + /** + * Curates file name for marker from existing log file path. + * log file format : partitionpath/.fileid_baseInstant.log.writetoken + * marker file format : partitionpath/fileId_writetoken_baseinstant.basefileExtn.marker.APPEND + * + * @param logFilePath log file path for which marker file name needs to be generated. + * @return the marker file name thus curated. + */ + private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { + Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath); + String fileId = FSUtils.getFileIdFromLogPath(logPath); + String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); + String writeToken = FSUtils.getWriteTokenFromLogPath(logPath); + + return FSUtils.makeDataFileName(baseInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension()); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java new file mode 100644 index 000000000..f40bd56e0 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; +import org.apache.hudi.testutils.Assertions; +import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests {@link UpgradeDowngrade}. + */ +public class TestUpgradeDowngrade extends HoodieClientTestBase { + + private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with deletePartialMarkerFiles={0} and TableType = {1}"; + + public static Stream configParams() { + Object[][] data = new Object[][] { + {true, HoodieTableType.COPY_ON_WRITE}, {false, HoodieTableType.COPY_ON_WRITE}, + {true, HoodieTableType.MERGE_ON_READ}, {false, HoodieTableType.MERGE_ON_READ} + }; + return Stream.of(data).map(Arguments::of); + } + + @Test + public void testLeftOverUpdatedPropFileCleanup() throws IOException { + testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testUpgrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + testUpgradeInternal(false, deletePartialMarkerFiles, tableType); + } + + public void testUpgradeInternal(boolean induceResiduesFromPrevUpgrade, boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + // init config, table and client. + Map params = new HashMap<>(); + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + } + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); + HoodieWriteClient client = getHoodieWriteClient(cfg); + + // prepare data. Make 2 commits, in which 2nd is not committed. + List firstPartitionCommit2FileSlices = new ArrayList<>(); + List secondPartitionCommit2FileSlices = new ArrayList<>(); + Pair, List> inputRecords = twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, client, false); + + HoodieTable table = this.getHoodieTable(metaClient, cfg); + HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get(); + + // delete one of the marker files in 2nd commit if need be. + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + List markerPaths = markerFiles.allMarkerFilePaths(); + if (deletePartialMarkerFiles) { + String toDeleteMarkerFile = markerPaths.get(0); + table.getMetaClient().getFs().delete(new Path(table.getMetaClient().getTempFolderPath() + "/" + commitInstant.getTimestamp() + "/" + toDeleteMarkerFile)); + markerPaths.remove(toDeleteMarkerFile); + } + + // set hoodie.table.version to 0 in hoodie.properties file + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ZERO); + + if (induceResiduesFromPrevUpgrade) { + createResidualFile(); + } + + // should re-create marker files for 2nd commit since its pending. + UpgradeDowngrade.run(metaClient, HoodieTableVersion.ONE, cfg, jsc, null); + + // assert marker files + assertMarkerFilesForUpgrade(table, commitInstant, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices); + + // verify hoodie.table.version got upgraded + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ONE.versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.ONE); + + // trigger 3rd commit with marker based rollback enabled. + List thirdBatch = triggerCommit("003", tableType, true); + + // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. + assertRows(inputRecords.getKey(), thirdBatch); + if (induceResiduesFromPrevUpgrade) { + assertFalse(fs.exists(new Path(metaClient.getMetaPath(), UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE))); + } + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tableType) throws IOException { + // init config, table and client. + Map params = new HashMap<>(); + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + } + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); + HoodieWriteClient client = getHoodieWriteClient(cfg); + + // prepare data. Make 2 commits, in which 2nd is not committed. + List firstPartitionCommit2FileSlices = new ArrayList<>(); + List secondPartitionCommit2FileSlices = new ArrayList<>(); + Pair, List> inputRecords = twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, client, false); + + HoodieTable table = this.getHoodieTable(metaClient, cfg); + HoodieInstant commitInstant = table.getPendingCommitTimeline().lastInstant().get(); + + // delete one of the marker files in 2nd commit if need be. + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + List markerPaths = markerFiles.allMarkerFilePaths(); + if (deletePartialMarkerFiles) { + String toDeleteMarkerFile = markerPaths.get(0); + table.getMetaClient().getFs().delete(new Path(table.getMetaClient().getTempFolderPath() + "/" + commitInstant.getTimestamp() + "/" + toDeleteMarkerFile)); + markerPaths.remove(toDeleteMarkerFile); + } + + // set hoodie.table.version to 1 in hoodie.properties file + prepForDowngrade(); + + // downgrade should be performed. all marker files should be deleted + UpgradeDowngrade.run(metaClient, HoodieTableVersion.ZERO, cfg, jsc, null); + + // assert marker files + assertMarkerFilesForDowngrade(table, commitInstant); + + // verify hoodie.table.version got downgraded + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.ZERO); + + // trigger 3rd commit with marker based rollback disabled. + List thirdBatch = triggerCommit("003", tableType, false); + + // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. + assertRows(inputRecords.getKey(), thirdBatch); + } + + private void assertMarkerFilesForDowngrade(HoodieTable table, HoodieInstant commitInstant) throws IOException { + // Verify recreated marker files are as expected + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + assertFalse(markerFiles.doesMarkerDirExist()); + } + + private void assertMarkerFilesForUpgrade(HoodieTable table, HoodieInstant commitInstant, List firstPartitionCommit2FileSlices, + List secondPartitionCommit2FileSlices) throws IOException { + // Verify recreated marker files are as expected + MarkerFiles markerFiles = new MarkerFiles(table, commitInstant.getTimestamp()); + assertTrue(markerFiles.doesMarkerDirExist()); + List files = markerFiles.allMarkerFilePaths(); + + assertEquals(2, files.size()); + List actualFiles = new ArrayList<>(); + for (String file : files) { + String fileName = MarkerFiles.stripMarkerSuffix(file); + actualFiles.add(fileName); + } + + List expectedFileSlices = new ArrayList<>(); + expectedFileSlices.addAll(firstPartitionCommit2FileSlices); + expectedFileSlices.addAll(secondPartitionCommit2FileSlices); + + List expectedPaths = new ArrayList<>(); + List> expectedLogFilePaths = new ArrayList<>(); + + for (FileSlice fileSlice : expectedFileSlices) { + String partitionPath = fileSlice.getPartitionPath(); + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + for (HoodieLogFile logFile : fileSlice.getLogFiles().collect(Collectors.toList())) { + // log file format can't be matched as is, since the write token can't be asserted. Hence asserting for partitionpath, fileId and baseCommit time. + String logBaseCommitTime = logFile.getBaseCommitTime(); + expectedLogFilePaths.add(Pair.of(partitionPath + "/" + logFile.getFileId(), logBaseCommitTime)); + } + } + if (fileSlice.getBaseInstantTime().equals(commitInstant.getTimestamp())) { + String path = fileSlice.getBaseFile().get().getPath(); + // for base files, path can be asserted as is. + expectedPaths.add(path.substring(path.indexOf(partitionPath))); + } + } + + // Trim log file paths only + List trimmedActualFiles = new ArrayList<>(); + for (String actualFile : actualFiles) { + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + trimmedActualFiles.add(actualFile.substring(0, actualFile.lastIndexOf('.'))); + } else { + trimmedActualFiles.add(actualFile); + } + } + // assert for base files. + for (String expected : expectedPaths) { + if (trimmedActualFiles.contains(expected)) { + trimmedActualFiles.remove(expected); + } + } + + if (expectedLogFilePaths.size() > 0) { + // assert for log files + List> actualLogFiles = new ArrayList<>(); + for (String actual : trimmedActualFiles) { + actualLogFiles.add(Pair.of(actual.substring(0, actual.indexOf('_')), actual.substring(actual.lastIndexOf('_') + 1))); + } + assertEquals(expectedLogFilePaths.size(), actualLogFiles.size()); + for (Pair entry : expectedLogFilePaths) { + assertTrue(actualLogFiles.contains(entry)); + } + } else { + assertTrue(trimmedActualFiles.size() == 0); + } + } + + private List triggerCommit(String newCommitTime, HoodieTableType tableType, boolean enableMarkedBasedRollback) { + Map params = new HashMap<>(); + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); + } + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(enableMarkedBasedRollback).withProps(params).build(); + HoodieWriteClient client = getHoodieWriteClient(cfg, true); + + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2); + JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD statuses = client.upsert(writeRecords, newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + client.commit(newCommitTime, statuses); + return records; + } + + private void assertRows(List firstBatch, List secondBatch) { + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + Dataset rows = HoodieClientTestUtils.read(jsc, metaClient.getBasePath(), sqlContext, metaClient.getFs(), fullPartitionPaths); + List expectedRecordKeys = new ArrayList<>(); + for (HoodieRecord rec : firstBatch) { + expectedRecordKeys.add(rec.getRecordKey()); + } + + for (HoodieRecord rec : secondBatch) { + expectedRecordKeys.add(rec.getRecordKey()); + } + List rowList = rows.collectAsList(); + assertEquals(expectedRecordKeys.size(), rows.count()); + + List actualRecordKeys = new ArrayList<>(); + for (Row row : rowList) { + actualRecordKeys.add(row.getAs("_row_key")); + } + + for (String expectedRecordKey : expectedRecordKeys) { + assertTrue(actualRecordKeys.contains(expectedRecordKey)); + } + } + + /** + * Create two commits and may or may not commit 2nd commit. + * + * @param firstPartitionCommit2FileSlices list to hold file slices in first partition. + * @param secondPartitionCommit2FileSlices list of hold file slices from second partition. + * @param cfg instance of {@link HoodieWriteConfig} + * @param client instance of {@link HoodieWriteClient} to use. + * @param commitSecondUpsert true if 2nd commit needs to be committed. false otherwise. + * @return a pair of list of records from 1st and 2nd batch. + */ + private Pair, List> twoUpsertCommitDataWithTwoPartitions(List firstPartitionCommit2FileSlices, + List secondPartitionCommit2FileSlices, + HoodieWriteConfig cfg, HoodieWriteClient client, + boolean commitSecondUpsert) throws IOException { + //just generate two partitions + dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); + //1. prepare data + HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2); + JavaRDD writeRecords = jsc.parallelize(records, 1); + JavaRDD statuses = client.upsert(writeRecords, newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + client.commit(newCommitTime, statuses); + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + List records2 = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records2, 1), newCommitTime); + Assertions.assertNoWriteErrors(statuses.collect()); + if (commitSecondUpsert) { + client.commit(newCommitTime, statuses); + } + + //2. assert filegroup and get the first partition fileslice + HoodieTable table = this.getHoodieTable(metaClient, cfg); + SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient()); + List firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, firstPartitionCommit2FileGroups.size()); + firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList())); + //3. assert filegroup and get the second partition fileslice + List secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList()); + assertEquals(1, secondPartitionCommit2FileGroups.size()); + secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList())); + + //4. assert fileslice + HoodieTableType tableType = metaClient.getTableType(); + if (tableType.equals(HoodieTableType.COPY_ON_WRITE)) { + assertEquals(2, firstPartitionCommit2FileSlices.size()); + assertEquals(2, secondPartitionCommit2FileSlices.size()); + } else { + assertEquals(1, firstPartitionCommit2FileSlices.size()); + assertEquals(1, secondPartitionCommit2FileSlices.size()); + } + return Pair.of(records, records2); + } + + private void prepForDowngrade() throws IOException { + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + try (FSDataOutputStream os = metaClient.getFs().create(propertyFile)) { + metaClient.getTableConfig().getProperties().store(os, ""); + } + } + + private void createResidualFile() throws IOException { + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + Path updatedPropertyFile = new Path(metaClient.getMetaPath() + "/" + UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE); + + // Step1: Copy hoodie.properties to hoodie.properties.orig + FileUtil.copy(metaClient.getFs(), propertyFile, metaClient.getFs(), updatedPropertyFile, + false, metaClient.getHadoopConf()); + } + + private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException { + Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); + // Load the properties and verify + FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile); + Properties prop = new Properties(); + prop.load(fsDataInputStream); + fsDataInputStream.close(); + assertEquals(Integer.toString(expectedVersion.versionCode()), prop.getProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME)); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 2526d953e..6db6529f2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -309,4 +309,29 @@ public class HoodieClientTestUtils { f.createNewFile(); return f.getAbsolutePath(); } + + public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException { + createTempFolderForMarkerFiles(basePath); + String folderPath = getTempFolderName(basePath); + // create dir for this instant + new File(folderPath + "/" + instantTime + "/" + partitionPath).mkdirs(); + new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile(); + } + + public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) { + String folderPath = getTempFolderName(basePath); + File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath); + if (markerDir.exists()) { + return markerDir.listFiles((dir, name) -> name.contains(".marker.MERGE")).length; + } + return 0; + } + + public static void createTempFolderForMarkerFiles(String basePath) { + new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs(); + } + + public static String getTempFolderName(String basePath) { + return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index f188f3a86..7e2dffbfe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; @@ -41,9 +42,8 @@ import java.util.Properties; import java.util.stream.Collectors; /** - * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are - * loaded from hoodie.properties, these properties are usually set during initializing a path as hoodie base path and - * never changes during the lifetime of a hoodie table. + * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during + * initializing a path as hoodie base path and never changes during the lifetime of a hoodie table. * * @see HoodieTableMetaClient * @since 0.3.0 @@ -55,6 +55,7 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties"; public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type"; + public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version"; @Deprecated public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format"; @Deprecated @@ -68,13 +69,13 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_BOOTSTRAP_BASE_PATH = "hoodie.bootstrap.base.path"; public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE; + public static final HoodieTableVersion DEFAULT_TABLE_VERSION = HoodieTableVersion.ZERO; public static final HoodieFileFormat DEFAULT_BASE_FILE_FORMAT = HoodieFileFormat.PARQUET; public static final HoodieFileFormat DEFAULT_LOG_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG; public static final String DEFAULT_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName(); public static final String DEFAULT_BOOTSTRAP_INDEX_CLASS = HFileBootstrapIndex.class.getName(); - - public static final Integer DEFAULT_TIMELINE_LAYOUT_VERSION = TimelineLayoutVersion.VERSION_0; public static final String DEFAULT_ARCHIVELOG_FOLDER = ""; + private Properties props; public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { @@ -96,6 +97,8 @@ public class HoodieTableConfig implements Serializable { throw new HoodieIOException("Could not load Hoodie properties from " + propertyPath, e); } this.props = props; + ValidationUtils.checkArgument(props.containsKey(HOODIE_TABLE_TYPE_PROP_NAME) && props.containsKey(HOODIE_TABLE_NAME_PROP_NAME), + "hoodie.properties file seems invalid. Please check for left over `.updated` files if any, manually copy it to hoodie.properties and retry"); } public HoodieTableConfig(Properties props) { @@ -107,7 +110,8 @@ public class HoodieTableConfig implements Serializable { * * @deprecated */ - public HoodieTableConfig() {} + public HoodieTableConfig() { + } /** * Initialize the hoodie meta directory and any necessary files inside the meta (including the hoodie.properties). @@ -160,6 +164,19 @@ public class HoodieTableConfig implements Serializable { : Option.empty(); } + /** + * @return the hoodie.table.version from hoodie.properties file. + */ + public HoodieTableVersion getTableVersion() { + return props.containsKey(HOODIE_TABLE_VERSION_PROP_NAME) + ? HoodieTableVersion.versionFromCode(Integer.parseInt(props.getProperty(HOODIE_TABLE_VERSION_PROP_NAME))) + : DEFAULT_TABLE_VERSION; + } + + public void setTableVersion(HoodieTableVersion tableVersion) { + props.put(HOODIE_TABLE_VERSION_PROP_NAME, Integer.toString(tableVersion.versionCode())); + } + /** * Read the payload class for HoodieRecords from the table properties. */ @@ -231,4 +248,8 @@ public class HoodieTableConfig implements Serializable { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); } + + public Properties getProperties() { + return props; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index a74728eb5..42fb3e9df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -114,7 +114,8 @@ public class HoodieTableMetaClient implements Serializable { } public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, - ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, String payloadClassName) { + ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, + String payloadClassName) { LOG.info("Loading HoodieTableMetaClient from " + basePath); this.basePath = basePath; this.consistencyGuardConfig = consistencyGuardConfig; @@ -362,6 +363,7 @@ public class HoodieTableMetaClient implements Serializable { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME, String.valueOf(HoodieTableVersion.current().versionCode())); if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != null) { properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName); } @@ -614,8 +616,4 @@ public class HoodieTableMetaClient implements Serializable { public void setActiveTimeline(HoodieActiveTimeline activeTimeline) { this.activeTimeline = activeTimeline; } - - public void setTableConfig(HoodieTableConfig tableConfig) { - this.tableConfig = tableConfig; - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java new file mode 100644 index 000000000..eb2e200de --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.exception.HoodieException; + +import java.util.Arrays; + +/** + * Table's version that controls what version of writer/readers can actually read/write + * to a given table. + */ +public enum HoodieTableVersion { + // < 0.6.0 versions + ZERO(0), + // 0.6.0 onwards + ONE(1); + + private final int versionCode; + + HoodieTableVersion(int versionCode) { + this.versionCode = versionCode; + } + + public int versionCode() { + return versionCode; + } + + public static HoodieTableVersion current() { + return ONE; + } + + static HoodieTableVersion versionFromCode(int versionCode) { + return Arrays.stream(HoodieTableVersion.values()) + .filter(v -> v.versionCode == versionCode).findAny() + .orElseThrow(() -> new HoodieException("Unknown versionCode:" + versionCode)); + } +}