1
0

[HUDI-1268] fix UpgradeDowngrade fs Rename issue for hdfs and aliyun oss (#2099)

This commit is contained in:
lw0090
2020-09-23 00:57:20 +08:00
committed by GitHub
parent 8087016504
commit fcc497eff1
3 changed files with 39 additions and 3 deletions

View File

@@ -125,6 +125,14 @@ public class UpgradeDowngrade {
metaClient.getTableConfig().setTableVersion(toVersion); metaClient.getTableConfig().setTableVersion(toVersion);
createUpdatedFile(metaClient.getTableConfig().getProperties()); createUpdatedFile(metaClient.getTableConfig().getProperties());
// because for different fs the fs.rename have different action,such as:
// a) for hdfs : if propsFilePath already exist,fs.rename will not replace propsFilePath, but just return false
// b) for localfs: if propsFilePath already exist,fs.rename will replace propsFilePath, and return ture
// c) for aliyun ossfs: if propsFilePath already exist,will throw FileAlreadyExistsException
// so we should delete the old propsFilePath. also upgrade and downgrade is Idempotent
if (fs.exists(propsFilePath)) {
fs.delete(propsFilePath, false);
}
// Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores. // Rename the .updated file to hoodie.properties. This is atomic in hdfs, but not in cloud stores.
// But as long as this does not leave a partial hoodie.properties file, we are okay. // But as long as this does not leave a partial hoodie.properties file, we are okay.
fs.rename(updatedPropsFilePath, propsFilePath); fs.rename(updatedPropsFilePath, propsFilePath);

View File

@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
@@ -82,6 +83,14 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
return Stream.of(data).map(Arguments::of); return Stream.of(data).map(Arguments::of);
} }
@BeforeEach
public void setUp() throws Exception {
initSparkContexts();
initDFS();
initTestDataGenerator();
initDFSMetaClient();
}
@Test @Test
public void testLeftOverUpdatedPropFileCleanup() throws IOException { public void testLeftOverUpdatedPropFileCleanup() throws IOException {
testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ);
@@ -98,7 +107,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
if (tableType == HoodieTableType.MERGE_ON_READ) { if (tableType == HoodieTableType.MERGE_ON_READ) {
params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name()); params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name());
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, HoodieTableType.MERGE_ON_READ);
} }
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params).build();
HoodieWriteClient client = getHoodieWriteClient(cfg); HoodieWriteClient client = getHoodieWriteClient(cfg);
@@ -143,7 +152,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
// Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback. // Check the entire dataset has all records only from 1st commit and 3rd commit since 2nd is expected to be rolledback.
assertRows(inputRecords.getKey(), thirdBatch); assertRows(inputRecords.getKey(), thirdBatch);
if (induceResiduesFromPrevUpgrade) { if (induceResiduesFromPrevUpgrade) {
assertFalse(fs.exists(new Path(metaClient.getMetaPath(), UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE))); assertFalse(dfs.exists(new Path(metaClient.getMetaPath(), UpgradeDowngrade.HOODIE_UPDATED_PROPERTY_FILE)));
} }
} }
@@ -332,7 +341,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
//just generate two partitions //just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}); dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data //1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath); HoodieTestDataGenerator.writePartitionMetadata(dfs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, dfsBasePath);
/** /**
* Write 1 (only inserts) * Write 1 (only inserts)
*/ */

View File

@@ -251,9 +251,28 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
// Create a temp folder as the base path // Create a temp folder as the base path
dfs = dfsCluster.getFileSystem(); dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString(); dfsBasePath = dfs.getWorkingDirectory().toString();
this.basePath = dfsBasePath;
this.hadoopConf = dfs.getConf();
dfs.mkdirs(new Path(dfsBasePath)); dfs.mkdirs(new Path(dfsBasePath));
} }
/**
* Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by
* {@code getTableType()}.
*
* @throws IOException
*/
protected void initDFSMetaClient() throws IOException {
if (dfsBasePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
if (jsc == null) {
throw new IllegalStateException("The Spark context has not been initialized.");
}
metaClient = HoodieTestUtils.init(dfs.getConf(), dfsBasePath, getTableType());
}
/** /**
* Cleanups the distributed file system. * Cleanups the distributed file system.
* *