From fcc497eff11f128b2c1feaa5ced6c340b6b41849 Mon Sep 17 00:00:00 2001 From: lw0090 Date: Wed, 23 Sep 2020 00:57:20 +0800 Subject: [PATCH] [HUDI-1268] fix UpgradeDowngrade fs Rename issue for hdfs and aliyun oss (#2099) --- .../hudi/table/upgrade/UpgradeDowngrade.java | 8 ++++++++ .../table/upgrade/TestUpgradeDowngrade.java | 15 ++++++++++++--- .../testutils/HoodieClientTestHarness.java | 19 +++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index 0783e84e5..53af1775c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -125,6 +125,14 @@ public class UpgradeDowngrade { metaClient.getTableConfig().setTableVersion(toVersion); createUpdatedFile(metaClient.getTableConfig().getProperties()); + // because for different fs the fs.rename have different action,such as: + // a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false + // b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture + // c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException + // so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent + if (fs.exists(propsFilePath)) { + fs.delete(propsFilePath, false); + } // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores. // But as long as this does not leave a partial hoodie.properties file, we are okay. fs.rename(updatedPropsFilePath, propsFilePath); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index f40bd56e0..e1dc4cefa 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -82,6 +83,14 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { return Stream.of(data).map(Arguments::of); } + @BeforeEach + public void setUp() throws Exception { + initSparkContexts(); + initDFS(); + initTestDataGenerator(); + initDFSMetaClient(); + } + @Test public void testLeftOverUpdatedPropFileCleanup() throws IOException { testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); @@ -98,7 +107,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { Map params = new HashMap<>(); if (tableType == HoodieTableType.MERGE_ON_READ) { params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); - metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, HoodieTableType.MERGE_ON_READ); } HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); HoodieWriteClient client = getHoodieWriteClient(cfg); @@ -143,7 +152,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. assertRows(inputRecords.getKey(), thirdBatch); if (induceResiduesFromPrevUpgrade) { - assertFalse(fs.exists(new Path(metaClient.getMetaPath(), UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE))); + assertFalse(dfs.exists(new Path(metaClient.getMetaPath(), UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE))); } } @@ -332,7 +341,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase { //just generate two partitions dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); //1. prepare data - HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); + HoodieTestDataGenerator.writePartitionMetadata(dfs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, dfsBasePath); /** * Write 1 (only inserts) */ diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 2a5f47692..f1e3f175e 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -251,9 +251,28 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im // Create a temp folder as the base path dfs = dfsCluster.getFileSystem(); dfsBasePath = dfs.getWorkingDirectory().toString(); + this.basePath = dfsBasePath; + this.hadoopConf = dfs.getConf(); dfs.mkdirs(new Path(dfsBasePath)); } + /** + * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by + * {@code getTableType()}. + * + * @throws IOException + */ + protected void initDFSMetaClient() throws IOException { + if (dfsBasePath == null) { + throw new IllegalStateException("The base path has not been initialized."); + } + + if (jsc == null) { + throw new IllegalStateException("The Spark context has not been initialized."); + } + metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, getTableType()); + } + /** * Cleanups the distributed file system. *