[HUDI-265] Failed to delete tmp dirs created in unit tests (#928)
This commit is contained in:
@@ -17,7 +17,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi;
|
package org.apache.hudi;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
@@ -29,30 +28,27 @@ import org.apache.hadoop.fs.Path;
|
|||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hudi.common.HoodieClientTestUtils;
|
import org.apache.hudi.common.HoodieClientTestUtils;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.minicluster.HdfsTestService;
|
import org.apache.hudi.common.minicluster.HdfsTestService;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.util.FSUtils;
|
import org.apache.hudi.common.util.FSUtils;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.SQLContext;
|
import org.apache.spark.sql.SQLContext;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The test harness for resource initialization and cleanup.
|
* The test harness for resource initialization and cleanup.
|
||||||
*/
|
*/
|
||||||
public abstract class HoodieClientTestHarness implements Serializable {
|
public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(HoodieClientTestHarness.class);
|
private static final Logger logger = LoggerFactory.getLogger(HoodieClientTestHarness.class);
|
||||||
|
|
||||||
protected transient JavaSparkContext jsc = null;
|
protected transient JavaSparkContext jsc = null;
|
||||||
protected transient SQLContext sqlContext;
|
protected transient SQLContext sqlContext;
|
||||||
protected transient FileSystem fs;
|
protected transient FileSystem fs;
|
||||||
protected String basePath = null;
|
|
||||||
protected TemporaryFolder folder = null;
|
|
||||||
protected transient HoodieTestDataGenerator dataGen = null;
|
protected transient HoodieTestDataGenerator dataGen = null;
|
||||||
protected transient ExecutorService executorService;
|
protected transient ExecutorService executorService;
|
||||||
protected transient HoodieTableMetaClient metaClient;
|
protected transient HoodieTableMetaClient metaClient;
|
||||||
@@ -69,7 +65,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void initResources() throws IOException {
|
public void initResources() throws IOException {
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initSparkContexts();
|
initSparkContexts();
|
||||||
initTestDataGenerator();
|
initTestDataGenerator();
|
||||||
initFileSystem();
|
initFileSystem();
|
||||||
@@ -85,7 +81,6 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
|||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTestDataGenerator();
|
cleanupTestDataGenerator();
|
||||||
cleanupFileSystem();
|
cleanupFileSystem();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -129,33 +124,6 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes a temporary folder and base path.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
protected void initTempFolderAndPath() throws IOException {
|
|
||||||
folder = new TemporaryFolder();
|
|
||||||
folder.create();
|
|
||||||
basePath = folder.getRoot().getAbsolutePath();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cleanups the temporary folder and base path.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
protected void cleanupTempFolderAndPath() throws IOException {
|
|
||||||
if (basePath != null) {
|
|
||||||
new File(basePath).delete();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (folder != null) {
|
|
||||||
logger.info("Explicitly removing workspace used in previously run test-case");
|
|
||||||
folder.delete();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes a file system with the hadoop configuration of Spark context.
|
* Initializes a file system with the hadoop configuration of Spark context.
|
||||||
*/
|
*/
|
||||||
@@ -229,16 +197,6 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
|||||||
dataGen = null;
|
dataGen = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets a default {@link HoodieTableType#COPY_ON_WRITE} table type.
|
|
||||||
* Sub-classes can override this method to specify a new table type.
|
|
||||||
*
|
|
||||||
* @return an instance of Hoodie table type.
|
|
||||||
*/
|
|
||||||
protected HoodieTableType getTableType() {
|
|
||||||
return HoodieTableType.COPY_ON_WRITE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes a distributed file system and base directory.
|
* Initializes a distributed file system and base directory.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -51,18 +51,17 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initSparkContexts();
|
initSparkContexts();
|
||||||
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath, MERGE_ON_READ);
|
metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath, MERGE_ON_READ);
|
||||||
client = new CompactionAdminClient(jsc, basePath);
|
client = new CompactionAdminClient(jsc, basePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() {
|
||||||
client.close();
|
client.close();
|
||||||
metaClient = null;
|
metaClient = null;
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi;
|
package org.apache.hudi;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@@ -33,15 +32,14 @@ import org.junit.Test;
|
|||||||
public class TestConsistencyGuard extends HoodieClientTestHarness {
|
public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() {
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initFileSystemWithDefaultConfiguration();
|
initFileSystemWithDefaultConfiguration();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cleanupFileSystem();
|
cleanupFileSystem();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -53,14 +53,13 @@ public class TestUpdateMapFunction extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath);
|
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath);
|
||||||
initSparkContexts("TestUpdateMapFunction");
|
initSparkContexts("TestUpdateMapFunction");
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() {
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
|
|||||||
hbaseConfig = utility.getConnection().getConfiguration();
|
hbaseConfig = utility.getConnection().getConfiguration();
|
||||||
initSparkContexts("TestQPSResourceAllocator");
|
initSparkContexts("TestQPSResourceAllocator");
|
||||||
|
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
|
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
|
||||||
// Initialize table
|
// Initialize table
|
||||||
initMetaClient();
|
initMetaClient();
|
||||||
@@ -59,7 +59,6 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
|
|||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
if (utility != null) {
|
if (utility != null) {
|
||||||
utility.shutdownMiniCluster();
|
utility.shutdownMiniCluster();
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
jsc.hadoopConfiguration().addResource(utility.getConfiguration());
|
jsc.hadoopConfiguration().addResource(utility.getConfiguration());
|
||||||
|
|
||||||
// Create a temp folder as the base path
|
// Create a temp folder as the base path
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initTestDataGenerator();
|
initTestDataGenerator();
|
||||||
initMetaClient();
|
initMetaClient();
|
||||||
}
|
}
|
||||||
@@ -110,7 +110,6 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupTestDataGenerator();
|
cleanupTestDataGenerator();
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,14 +35,13 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
|||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initSparkContexts("TestHoodieIndex");
|
initSparkContexts("TestHoodieIndex");
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initMetaClient();
|
initMetaClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() {
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -89,7 +89,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initSparkContexts("TestHoodieBloomIndex");
|
initSparkContexts("TestHoodieBloomIndex");
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initFileSystem();
|
initFileSystem();
|
||||||
// We have some records to be tagged (two different partitions)
|
// We have some records to be tagged (two different partitions)
|
||||||
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||||
@@ -101,7 +101,6 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupFileSystem();
|
cleanupFileSystem();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initSparkContexts("TestHoodieGlobalBloomIndex");
|
initSparkContexts("TestHoodieGlobalBloomIndex");
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
// We have some records to be tagged (two different partitions)
|
// We have some records to be tagged (two different partitions)
|
||||||
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||||
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
||||||
@@ -72,9 +72,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() {
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
@Before
|
@Before
|
||||||
public void init() throws Exception {
|
public void init() throws Exception {
|
||||||
initDFS();
|
initDFS();
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initSparkContexts("TestHoodieCommitArchiveLog");
|
initSparkContexts("TestHoodieCommitArchiveLog");
|
||||||
hadoopConf = dfs.getConf();
|
hadoopConf = dfs.getConf();
|
||||||
jsc.hadoopConfiguration().addResource(dfs.getConf());
|
jsc.hadoopConfiguration().addResource(dfs.getConf());
|
||||||
@@ -70,7 +70,6 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
@After
|
@After
|
||||||
public void clean() throws IOException {
|
public void clean() throws IOException {
|
||||||
cleanupDFS();
|
cleanupDFS();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
initSparkContexts("TestHoodieCompactor");
|
initSparkContexts("TestHoodieCompactor");
|
||||||
|
|
||||||
// Create a temp folder as the base path
|
// Create a temp folder as the base path
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||||
fs = FSUtils.getFs(basePath, hadoopConf);
|
fs = FSUtils.getFs(basePath, hadoopConf);
|
||||||
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
||||||
@@ -71,7 +71,6 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cleanupFileSystem();
|
cleanupFileSystem();
|
||||||
cleanupTestDataGenerator();
|
cleanupTestDataGenerator();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
|||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initSparkContexts("TestHoodieMergeHandle");
|
initSparkContexts("TestHoodieMergeHandle");
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initFileSystem();
|
initFileSystem();
|
||||||
initTestDataGenerator();
|
initTestDataGenerator();
|
||||||
initMetaClient();
|
initMetaClient();
|
||||||
@@ -66,7 +66,6 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
|||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cleanupFileSystem();
|
cleanupFileSystem();
|
||||||
cleanupTestDataGenerator();
|
cleanupTestDataGenerator();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
|||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
initSparkContexts("TestCopyOnWriteTable");
|
initSparkContexts("TestCopyOnWriteTable");
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
initMetaClient();
|
initMetaClient();
|
||||||
initTestDataGenerator();
|
initTestDataGenerator();
|
||||||
initFileSystem();
|
initFileSystem();
|
||||||
@@ -81,7 +81,6 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
|||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupMetaClient();
|
cleanupMetaClient();
|
||||||
cleanupFileSystem();
|
cleanupFileSystem();
|
||||||
cleanupTestDataGenerator();
|
cleanupTestDataGenerator();
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
initDFS();
|
initDFS();
|
||||||
initSparkContexts("TestHoodieMergeOnReadTable");
|
initSparkContexts("TestHoodieMergeOnReadTable");
|
||||||
jsc.hadoopConfiguration().addResource(dfs.getConf());
|
jsc.hadoopConfiguration().addResource(dfs.getConf());
|
||||||
initTempFolderAndPath();
|
initPath();
|
||||||
dfs.mkdirs(new Path(basePath));
|
dfs.mkdirs(new Path(basePath));
|
||||||
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
|
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
|
||||||
initTestDataGenerator();
|
initTestDataGenerator();
|
||||||
@@ -92,7 +92,6 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
@After
|
@After
|
||||||
public void clean() throws IOException {
|
public void clean() throws IOException {
|
||||||
cleanupDFS();
|
cleanupDFS();
|
||||||
cleanupTempFolderAndPath();
|
|
||||||
cleanupSparkContexts();
|
cleanupSparkContexts();
|
||||||
cleanupTestDataGenerator();
|
cleanupTestDataGenerator();
|
||||||
}
|
}
|
||||||
@@ -968,6 +967,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
// Rollback again to pretend the first rollback failed partially. This should not error our
|
// Rollback again to pretend the first rollback failed partially. This should not error our
|
||||||
writeClient.rollback(newCommitTime);
|
writeClient.rollback(newCommitTime);
|
||||||
|
folder.delete();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.common;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.SyncableFileSystemView;
|
||||||
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
public class HoodieCommonTestHarness {
|
||||||
|
|
||||||
|
protected String basePath = null;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder folder = new TemporaryFolder();
|
||||||
|
|
||||||
|
protected transient HoodieTableMetaClient metaClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes basePath.
|
||||||
|
*/
|
||||||
|
protected void initPath() {
|
||||||
|
this.basePath = folder.getRoot().getAbsolutePath();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes an instance of {@link HoodieTableMetaClient} with a special table type
|
||||||
|
* specified by {@code getTableType()}.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected void initMetaClient() throws IOException {
|
||||||
|
metaClient = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
|
||||||
|
basePath = metaClient.getBasePath();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void refreshFsView() throws IOException {
|
||||||
|
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) throws IOException {
|
||||||
|
return getFileSystemView(timeline, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline, boolean enableIncrementalTimelineSync) {
|
||||||
|
return new HoodieTableFileSystemView(metaClient, timeline, enableIncrementalTimelineSync);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient)
|
||||||
|
throws IOException {
|
||||||
|
return getFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline)
|
||||||
|
throws IOException {
|
||||||
|
return getFileSystemView(timeline, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets a default {@link HoodieTableType#COPY_ON_WRITE} table type.
|
||||||
|
* Sub-classes can override this method to specify a new table type.
|
||||||
|
*
|
||||||
|
* @return an instance of Hoodie table type.
|
||||||
|
*/
|
||||||
|
protected HoodieTableType getTableType() {
|
||||||
|
return HoodieTableType.COPY_ON_WRITE;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -31,28 +31,20 @@ import org.apache.hadoop.fs.Path;
|
|||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class HoodieTableMetaClientTest {
|
public class HoodieTableMetaClientTest extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private HoodieTableMetaClient metaClient;
|
|
||||||
private String basePath;
|
|
||||||
|
|
||||||
@Rule
|
|
||||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
|
initMetaClient();
|
||||||
basePath = metaClient.getBasePath();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.minicluster.MiniClusterUtil;
|
import org.apache.hudi.common.minicluster.MiniClusterUtil;
|
||||||
import org.apache.hudi.common.model.HoodieArchivedLogFile;
|
import org.apache.hudi.common.model.HoodieArchivedLogFile;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
@@ -65,21 +66,20 @@ import org.apache.hudi.common.util.FSUtils;
|
|||||||
import org.apache.hudi.common.util.HoodieAvroUtils;
|
import org.apache.hudi.common.util.HoodieAvroUtils;
|
||||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||||
import org.apache.hudi.exception.CorruptedLogFileException;
|
import org.apache.hudi.exception.CorruptedLogFileException;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
@SuppressWarnings("Duplicates")
|
@SuppressWarnings("Duplicates")
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class HoodieLogFormatTest {
|
public class HoodieLogFormatTest extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static final String BASE_OUTPUT_PATH = "/tmp/";
|
private static String BASE_OUTPUT_PATH = "/tmp/";
|
||||||
private static String basePath;
|
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private Path partitionPath;
|
private Path partitionPath;
|
||||||
private int bufferSize = 4096;
|
private int bufferSize = 4096;
|
||||||
@@ -108,8 +108,7 @@ public class HoodieLogFormatTest {
|
|||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException, InterruptedException {
|
public void setUp() throws IOException, InterruptedException {
|
||||||
this.fs = MiniClusterUtil.fileSystem;
|
this.fs = MiniClusterUtil.fileSystem;
|
||||||
TemporaryFolder folder = new TemporaryFolder();
|
|
||||||
folder.create();
|
|
||||||
assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath())));
|
assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath())));
|
||||||
this.partitionPath = new Path(folder.getRoot().getPath());
|
this.partitionPath = new Path(folder.getRoot().getPath());
|
||||||
this.basePath = folder.getRoot().getParent();
|
this.basePath = folder.getRoot().getParent();
|
||||||
|
|||||||
@@ -24,8 +24,9 @@ import static org.junit.Assert.assertTrue;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|
||||||
import org.apache.hudi.common.table.HoodieTimeline;
|
import org.apache.hudi.common.table.HoodieTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -34,20 +35,16 @@ import org.junit.Before;
|
|||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class HoodieActiveTimelineTest {
|
public class HoodieActiveTimelineTest extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private HoodieActiveTimeline timeline;
|
private HoodieActiveTimeline timeline;
|
||||||
private HoodieTableMetaClient metaClient;
|
|
||||||
@Rule
|
@Rule
|
||||||
public final ExpectedException exception = ExpectedException.none();
|
public final ExpectedException exception = ExpectedException.none();
|
||||||
@Rule
|
|
||||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
this.metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
|
initMetaClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ import java.util.stream.Stream;
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.CompactionOperation;
|
import org.apache.hudi.common.model.CompactionOperation;
|
||||||
import org.apache.hudi.common.model.FileSlice;
|
import org.apache.hudi.common.model.FileSlice;
|
||||||
import org.apache.hudi.common.model.HoodieDataFile;
|
import org.apache.hudi.common.model.HoodieDataFile;
|
||||||
@@ -45,8 +46,6 @@ import org.apache.hudi.common.model.HoodieFileGroup;
|
|||||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|
||||||
import org.apache.hudi.common.table.HoodieTimeline;
|
import org.apache.hudi.common.table.HoodieTimeline;
|
||||||
import org.apache.hudi.common.table.SyncableFileSystemView;
|
import org.apache.hudi.common.table.SyncableFileSystemView;
|
||||||
import org.apache.hudi.common.table.TableFileSystemView;
|
import org.apache.hudi.common.table.TableFileSystemView;
|
||||||
@@ -62,50 +61,38 @@ import org.apache.log4j.LogManager;
|
|||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
@SuppressWarnings("ResultOfMethodCallIgnored")
|
@SuppressWarnings("ResultOfMethodCallIgnored")
|
||||||
public class HoodieTableFileSystemViewTest {
|
public class HoodieTableFileSystemViewTest extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static final transient Logger log = LogManager.getLogger(HoodieTableFileSystemViewTest.class);
|
private static final transient Logger log = LogManager.getLogger(HoodieTableFileSystemViewTest.class);
|
||||||
|
|
||||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||||
|
|
||||||
protected HoodieTableMetaClient metaClient;
|
|
||||||
protected String basePath;
|
|
||||||
protected SyncableFileSystemView fsView;
|
protected SyncableFileSystemView fsView;
|
||||||
protected TableFileSystemView.ReadOptimizedView roView;
|
protected TableFileSystemView.ReadOptimizedView roView;
|
||||||
protected TableFileSystemView.RealtimeView rtView;
|
protected TableFileSystemView.RealtimeView rtView;
|
||||||
|
|
||||||
@Rule
|
|
||||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
initializeMetaClient();
|
initMetaClient();
|
||||||
refreshFsView();
|
refreshFsView();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initializeMetaClient() throws IOException {
|
|
||||||
metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
|
||||||
basePath = metaClient.getBasePath();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) throws IOException {
|
|
||||||
return new HoodieTableFileSystemView(metaClient, timeline);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void refreshFsView() throws IOException {
|
protected void refreshFsView() throws IOException {
|
||||||
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
|
super.refreshFsView();
|
||||||
|
closeFsView();
|
||||||
|
fsView = getFileSystemView(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
|
||||||
|
roView = fsView;
|
||||||
|
rtView = fsView;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeFsView() {
|
||||||
if (null != fsView) {
|
if (null != fsView) {
|
||||||
fsView.close();
|
fsView.close();
|
||||||
fsView = null;
|
fsView = null;
|
||||||
}
|
}
|
||||||
fsView = getFileSystemView(metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
|
|
||||||
roView = (TableFileSystemView.ReadOptimizedView) fsView;
|
|
||||||
rtView = (TableFileSystemView.RealtimeView) fsView;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -1185,4 +1172,9 @@ public class HoodieTableFileSystemViewTest {
|
|||||||
Assert.assertEquals(1, fileIdsInCompaction.size());
|
Assert.assertEquals(1, fileIdsInCompaction.size());
|
||||||
Assert.assertTrue(fileIdsInCompaction.contains(fileId));
|
Assert.assertTrue(fileIdsInCompaction.contains(fileId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected HoodieTableType getTableType() {
|
||||||
|
return HoodieTableType.MERGE_ON_READ;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
|||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
import org.apache.hudi.common.HoodieCleanStat;
|
import org.apache.hudi.common.HoodieCleanStat;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.HoodieRollbackStat;
|
import org.apache.hudi.common.HoodieRollbackStat;
|
||||||
import org.apache.hudi.common.model.CompactionOperation;
|
import org.apache.hudi.common.model.CompactionOperation;
|
||||||
import org.apache.hudi.common.model.FileSlice;
|
import org.apache.hudi.common.model.FileSlice;
|
||||||
@@ -50,7 +51,6 @@ import org.apache.hudi.common.model.HoodieDataFile;
|
|||||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.HoodieTimeline;
|
import org.apache.hudi.common.table.HoodieTimeline;
|
||||||
@@ -67,62 +67,36 @@ import org.apache.log4j.LogManager;
|
|||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class IncrementalFSViewSyncTest {
|
public class IncrementalFSViewSyncTest extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static final transient Logger log = LogManager.getLogger(IncrementalFSViewSyncTest.class);
|
private static final transient Logger log = LogManager.getLogger(IncrementalFSViewSyncTest.class);
|
||||||
|
|
||||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||||
|
|
||||||
protected HoodieTableMetaClient metaClient;
|
|
||||||
protected String basePath;
|
|
||||||
|
|
||||||
private final List<String> partitions = Arrays.asList("2018/01/01", "2018/01/02",
|
private final List<String> partitions = Arrays.asList("2018/01/01", "2018/01/02",
|
||||||
"2019/03/01");
|
"2019/03/01");
|
||||||
private final List<String> fileIdsPerPartition =
|
private final List<String> fileIdsPerPartition =
|
||||||
IntStream.range(0, 10).mapToObj(x -> UUID.randomUUID().toString()).collect(Collectors.toList());
|
IntStream.range(0, 10).mapToObj(x -> UUID.randomUUID().toString()).collect(Collectors.toList());
|
||||||
|
|
||||||
@Rule
|
|
||||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
initializeMetaClient();
|
initMetaClient();
|
||||||
refreshFsView();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void initializeMetaClient() throws IOException {
|
|
||||||
metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
|
||||||
basePath = metaClient.getBasePath();
|
|
||||||
partitions.forEach(p -> new File(basePath + "/" + p).mkdirs());
|
partitions.forEach(p -> new File(basePath + "/" + p).mkdirs());
|
||||||
}
|
refreshFsView();
|
||||||
|
|
||||||
protected void refreshFsView() throws IOException {
|
|
||||||
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected SyncableFileSystemView getNewFileSystemView(HoodieTableMetaClient metaClient) throws IOException {
|
|
||||||
return getNewFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected SyncableFileSystemView getNewFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline)
|
|
||||||
throws IOException {
|
|
||||||
return new HoodieTableFileSystemView(metaClient, timeline, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyPartitionsAndTimeline() throws IOException {
|
public void testEmptyPartitionsAndTimeline() throws IOException {
|
||||||
SyncableFileSystemView view = getNewFileSystemView(metaClient);
|
SyncableFileSystemView view = getFileSystemView(metaClient);
|
||||||
Assert.assertFalse(view.getLastInstant().isPresent());
|
Assert.assertFalse(view.getLastInstant().isPresent());
|
||||||
partitions.forEach(p -> Assert.assertEquals(0, view.getLatestFileSlices(p).count()));
|
partitions.forEach(p -> Assert.assertEquals(0, view.getLatestFileSlices(p).count()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAsyncCompaction() throws IOException {
|
public void testAsyncCompaction() throws IOException {
|
||||||
SyncableFileSystemView view = getNewFileSystemView(metaClient);
|
SyncableFileSystemView view = getFileSystemView(metaClient);
|
||||||
view.sync();
|
view.sync();
|
||||||
|
|
||||||
// Run 3 ingestion on MOR table (3 delta commits)
|
// Run 3 ingestion on MOR table (3 delta commits)
|
||||||
@@ -181,7 +155,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIngestion() throws IOException {
|
public void testIngestion() throws IOException {
|
||||||
SyncableFileSystemView view = getNewFileSystemView(metaClient);
|
SyncableFileSystemView view = getFileSystemView(metaClient);
|
||||||
|
|
||||||
// Add an empty ingestion
|
// Add an empty ingestion
|
||||||
String firstEmptyInstantTs = "11";
|
String firstEmptyInstantTs = "11";
|
||||||
@@ -198,7 +172,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
partitions.forEach(p -> Assert.assertEquals(0, view.getLatestFileSlices(p).count()));
|
partitions.forEach(p -> Assert.assertEquals(0, view.getLatestFileSlices(p).count()));
|
||||||
|
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
SyncableFileSystemView newView = getNewFileSystemView(metaClient);
|
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||||
for (String partition : partitions) {
|
for (String partition : partitions) {
|
||||||
newView.getAllFileGroups(partition).count();
|
newView.getAllFileGroups(partition).count();
|
||||||
}
|
}
|
||||||
@@ -225,7 +199,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
@Test
|
@Test
|
||||||
public void testMultipleTransitions() throws IOException {
|
public void testMultipleTransitions() throws IOException {
|
||||||
|
|
||||||
SyncableFileSystemView view1 = getNewFileSystemView(metaClient);
|
SyncableFileSystemView view1 = getFileSystemView(metaClient);
|
||||||
view1.sync();
|
view1.sync();
|
||||||
Map<String, List<String>> instantsToFiles = null;
|
Map<String, List<String>> instantsToFiles = null;
|
||||||
|
|
||||||
@@ -237,7 +211,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
testMultipleWriteSteps(view1, Arrays.asList("11"), true, "11");
|
testMultipleWriteSteps(view1, Arrays.asList("11"), true, "11");
|
||||||
|
|
||||||
SyncableFileSystemView view2 =
|
SyncableFileSystemView view2 =
|
||||||
getNewFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||||
|
|
||||||
// Run 2 more ingestion on MOR table. View1 is not yet synced but View2 is
|
// Run 2 more ingestion on MOR table. View1 is not yet synced but View2 is
|
||||||
instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", "13"), true, "11"));
|
instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", "13"), true, "11"));
|
||||||
@@ -247,7 +221,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
|
|
||||||
view2.sync();
|
view2.sync();
|
||||||
SyncableFileSystemView view3 =
|
SyncableFileSystemView view3 =
|
||||||
getNewFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||||
partitions.stream().forEach(p -> view3.getLatestFileSlices(p).count());
|
partitions.stream().forEach(p -> view3.getLatestFileSlices(p).count());
|
||||||
view3.sync();
|
view3.sync();
|
||||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size());
|
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size());
|
||||||
@@ -260,7 +234,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
view1.sync();
|
view1.sync();
|
||||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size());
|
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size());
|
||||||
SyncableFileSystemView view4 =
|
SyncableFileSystemView view4 =
|
||||||
getNewFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||||
partitions.stream().forEach(p -> view4.getLatestFileSlices(p).count());
|
partitions.stream().forEach(p -> view4.getLatestFileSlices(p).count());
|
||||||
view4.sync();
|
view4.sync();
|
||||||
|
|
||||||
@@ -275,7 +249,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
view1.sync();
|
view1.sync();
|
||||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2);
|
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2);
|
||||||
SyncableFileSystemView view5 =
|
SyncableFileSystemView view5 =
|
||||||
getNewFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||||
partitions.stream().forEach(p -> view5.getLatestFileSlices(p).count());
|
partitions.stream().forEach(p -> view5.getLatestFileSlices(p).count());
|
||||||
view5.sync();
|
view5.sync();
|
||||||
|
|
||||||
@@ -296,7 +270,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
view1.sync();
|
view1.sync();
|
||||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2);
|
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2);
|
||||||
SyncableFileSystemView view6 =
|
SyncableFileSystemView view6 =
|
||||||
getNewFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||||
partitions.stream().forEach(p -> view5.getLatestFileSlices(p).count());
|
partitions.stream().forEach(p -> view5.getLatestFileSlices(p).count());
|
||||||
view6.sync();
|
view6.sync();
|
||||||
|
|
||||||
@@ -375,7 +349,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
partitions.forEach(p -> Assert.assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count()));
|
partitions.forEach(p -> Assert.assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count()));
|
||||||
|
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
SyncableFileSystemView newView = getNewFileSystemView(metaClient);
|
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||||
for (String partition : partitions) {
|
for (String partition : partitions) {
|
||||||
newView.getAllFileGroups(partition).count();
|
newView.getAllFileGroups(partition).count();
|
||||||
}
|
}
|
||||||
@@ -427,7 +401,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
partitions.forEach(p -> Assert.assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count()));
|
partitions.forEach(p -> Assert.assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count()));
|
||||||
|
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
SyncableFileSystemView newView = getNewFileSystemView(metaClient);
|
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||||
for (String partition : partitions) {
|
for (String partition : partitions) {
|
||||||
newView.getAllFileGroups(partition).count();
|
newView.getAllFileGroups(partition).count();
|
||||||
}
|
}
|
||||||
@@ -559,7 +533,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
});
|
});
|
||||||
|
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
SyncableFileSystemView newView = getNewFileSystemView(metaClient);
|
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||||
partitions.forEach(p -> newView.getLatestFileSlices(p).count());
|
partitions.forEach(p -> newView.getLatestFileSlices(p).count());
|
||||||
areViewsConsistent(view, newView, initialExpTotalFileSlices + partitions.size() * fileIdsPerPartition.size());
|
areViewsConsistent(view, newView, initialExpTotalFileSlices + partitions.size() * fileIdsPerPartition.size());
|
||||||
}
|
}
|
||||||
@@ -681,7 +655,7 @@ public class IncrementalFSViewSyncTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
SyncableFileSystemView newView = getNewFileSystemView(metaClient);
|
SyncableFileSystemView newView = getFileSystemView(metaClient);
|
||||||
for (String partition : partitions) {
|
for (String partition : partitions) {
|
||||||
newView.getAllFileGroups(partition).count();
|
newView.getAllFileGroups(partition).count();
|
||||||
}
|
}
|
||||||
@@ -788,4 +762,10 @@ public class IncrementalFSViewSyncTest {
|
|||||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant).getFileName()));
|
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant).getFileName()));
|
||||||
return writeStats.stream().map(e -> e.getValue().getPath()).collect(Collectors.toList());
|
return writeStats.stream().map(e -> e.getValue().getPath()).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected HoodieTableType getTableType() {
|
||||||
|
return HoodieTableType.MERGE_ON_READ;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,10 +26,10 @@ import org.apache.hudi.common.table.SyncableFileSystemView;
|
|||||||
public class RocksDBBasedIncrementalFSViewSyncTest extends IncrementalFSViewSyncTest {
|
public class RocksDBBasedIncrementalFSViewSyncTest extends IncrementalFSViewSyncTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected SyncableFileSystemView getNewFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline)
|
protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return new RocksDbBasedFileSystemView(metaClient, timeline,
|
return new RocksDbBasedFileSystemView(metaClient, timeline,
|
||||||
FileSystemViewStorageConfig.newBuilder().withRocksDBPath(tmpFolder.newFolder().getAbsolutePath())
|
FileSystemViewStorageConfig.newBuilder().withRocksDBPath(folder.newFolder().getAbsolutePath())
|
||||||
.withIncrementalTimelineSync(true).build());
|
.withIncrementalTimelineSync(true).build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ public class RocksDbBasedFileSystemViewTest extends HoodieTableFileSystemViewTes
|
|||||||
|
|
||||||
protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) throws IOException {
|
protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) throws IOException {
|
||||||
return new RocksDbBasedFileSystemView(metaClient, timeline,
|
return new RocksDbBasedFileSystemView(metaClient, timeline,
|
||||||
FileSystemViewStorageConfig.newBuilder().withRocksDBPath(tmpFolder.newFolder().getAbsolutePath())
|
FileSystemViewStorageConfig.newBuilder().withRocksDBPath(folder.newFolder().getAbsolutePath())
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ import org.apache.hudi.common.table.SyncableFileSystemView;
|
|||||||
public class SpillableMapBasedIncrementalFSViewSyncTest extends IncrementalFSViewSyncTest {
|
public class SpillableMapBasedIncrementalFSViewSyncTest extends IncrementalFSViewSyncTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected SyncableFileSystemView getNewFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline) {
|
protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline) {
|
||||||
return new SpillableMapBasedFileSystemView(metaClient, timeline,
|
return new SpillableMapBasedFileSystemView(metaClient, timeline,
|
||||||
FileSystemViewStorageConfig.newBuilder().withMaxMemoryForView(0L).withIncrementalTimelineSync(true).build());
|
FileSystemViewStorageConfig.newBuilder().withMaxMemoryForView(0L).withIncrementalTimelineSync(true).build());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,6 @@
|
|||||||
package org.apache.hudi.common.util;
|
package org.apache.hudi.common.util;
|
||||||
|
|
||||||
import static org.apache.hudi.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
import static org.apache.hudi.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
||||||
import static org.apache.hudi.common.model.HoodieTestUtils.getDefaultHadoopConf;
|
|
||||||
import static org.apache.hudi.common.util.CompactionTestUtils.createCompactionPlan;
|
import static org.apache.hudi.common.util.CompactionTestUtils.createCompactionPlan;
|
||||||
import static org.apache.hudi.common.util.CompactionTestUtils.scheduleCompaction;
|
import static org.apache.hudi.common.util.CompactionTestUtils.scheduleCompaction;
|
||||||
import static org.apache.hudi.common.util.CompactionTestUtils.setupAndValidateCompactionOperations;
|
import static org.apache.hudi.common.util.CompactionTestUtils.setupAndValidateCompactionOperations;
|
||||||
@@ -35,21 +34,19 @@ import java.util.stream.IntStream;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.FileSlice;
|
import org.apache.hudi.common.model.FileSlice;
|
||||||
import org.apache.hudi.common.model.HoodieFileGroupId;
|
import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.util.CompactionTestUtils.TestHoodieDataFile;
|
import org.apache.hudi.common.util.CompactionTestUtils.TestHoodieDataFile;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class TestCompactionUtils {
|
public class TestCompactionUtils extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||||
|
|
||||||
@@ -57,17 +54,11 @@ public class TestCompactionUtils {
|
|||||||
new ImmutableMap.Builder<String, Double>()
|
new ImmutableMap.Builder<String, Double>()
|
||||||
.put("key1", 1.0)
|
.put("key1", 1.0)
|
||||||
.put("key2", 3.0).build();
|
.put("key2", 3.0).build();
|
||||||
@Rule
|
|
||||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
|
||||||
private HoodieTableMetaClient metaClient;
|
|
||||||
private String basePath;
|
|
||||||
private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> metrics;
|
private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> metrics;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
metaClient = HoodieTestUtils.init(getDefaultHadoopConf(),
|
initMetaClient();
|
||||||
tmpFolder.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
|
||||||
basePath = metaClient.getBasePath();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -237,4 +228,9 @@ public class TestCompactionUtils {
|
|||||||
});
|
});
|
||||||
Assert.assertEquals("Metrics set", metrics, op.getMetrics());
|
Assert.assertEquals("Metrics set", metrics, op.getMetrics());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected HoodieTableType getTableType() {
|
||||||
|
return HoodieTableType.MERGE_ON_READ;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,23 +32,29 @@ import java.util.regex.Pattern;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class TestFSUtils {
|
public class TestFSUtils extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
initMetaClient();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMakeDataFileName() {
|
public void testMakeDataFileName() {
|
||||||
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
||||||
@@ -75,13 +81,9 @@ public class TestFSUtils {
|
|||||||
* This code tests the fix by ensuring ".hoodie" and their subfolders are never processed.
|
* This code tests the fix by ensuring ".hoodie" and their subfolders are never processed.
|
||||||
*/
|
*/
|
||||||
public void testProcessFiles() throws Exception {
|
public void testProcessFiles() throws Exception {
|
||||||
TemporaryFolder tmpFolder = new TemporaryFolder();
|
|
||||||
tmpFolder.create();
|
|
||||||
// All directories including marker dirs.
|
// All directories including marker dirs.
|
||||||
List<String> folders = Arrays.asList("2016/04/15", "2016/05/16", ".hoodie/.temp/2/2016/04/15",
|
List<String> folders = Arrays.asList("2016/04/15", "2016/05/16", ".hoodie/.temp/2/2016/04/15",
|
||||||
".hoodie/.temp/2/2016/05/16");
|
".hoodie/.temp/2/2016/05/16");
|
||||||
HoodieTableMetaClient metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
|
|
||||||
String basePath = metaClient.getBasePath();
|
|
||||||
folders.stream().forEach(f -> {
|
folders.stream().forEach(f -> {
|
||||||
try {
|
try {
|
||||||
metaClient.getFs().mkdirs(new Path(new Path(basePath), f));
|
metaClient.getFs().mkdirs(new Path(new Path(basePath), f));
|
||||||
|
|||||||
@@ -27,15 +27,14 @@ import java.io.ByteArrayInputStream;
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class TestFileIOUtils {
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestFileIOUtils extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMkdirAndDelete() throws IOException {
|
public void testMkdirAndDelete() throws IOException {
|
||||||
TemporaryFolder folder = new TemporaryFolder();
|
|
||||||
folder.create();
|
|
||||||
try {
|
try {
|
||||||
FileIOUtils.mkdir(folder.getRoot());
|
FileIOUtils.mkdir(folder.getRoot());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ package org.apache.hudi.common.util;
|
|||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@@ -34,6 +33,7 @@ import org.apache.avro.generic.GenericRecord;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.avro.HoodieAvroWriteSupport;
|
import org.apache.hudi.avro.HoodieAvroWriteSupport;
|
||||||
import org.apache.hudi.common.BloomFilter;
|
import org.apache.hudi.common.BloomFilter;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||||
@@ -41,19 +41,12 @@ import org.apache.parquet.hadoop.ParquetWriter;
|
|||||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class TestParquetUtils {
|
public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
|
|
||||||
private String basePath;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() {
|
||||||
// Create a temp folder as the base path
|
initPath();
|
||||||
TemporaryFolder folder = new TemporaryFolder();
|
|
||||||
folder.create();
|
|
||||||
basePath = folder.getRoot().getAbsolutePath();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -19,7 +19,6 @@
|
|||||||
package org.apache.hudi.common.util;
|
package org.apache.hudi.common.util;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@@ -33,14 +32,21 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
|||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestRocksDBManager {
|
public class TestRocksDBManager {
|
||||||
|
|
||||||
private static RocksDBDAO dbManager;
|
private static RocksDBDAO dbManager;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpClass() {
|
||||||
|
dbManager = new RocksDBDAO("/dummy/path",
|
||||||
|
FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath());
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void drop() throws IOException {
|
public static void tearDownClass() {
|
||||||
if (dbManager != null) {
|
if (dbManager != null) {
|
||||||
dbManager.close();
|
dbManager.close();
|
||||||
dbManager = null;
|
dbManager = null;
|
||||||
@@ -66,8 +72,6 @@ public class TestRocksDBManager {
|
|||||||
return new Payload(prefix, key, val, family);
|
return new Payload(prefix, key, val, family);
|
||||||
}).collect(Collectors.toList());
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
dbManager = new RocksDBDAO("/dummy/path",
|
|
||||||
FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath());
|
|
||||||
colFamilies.stream().forEach(family -> dbManager.dropColumnFamily(family));
|
colFamilies.stream().forEach(family -> dbManager.dropColumnFamily(family));
|
||||||
colFamilies.stream().forEach(family -> dbManager.addColumnFamily(family));
|
colFamilies.stream().forEach(family -> dbManager.addColumnFamily(family));
|
||||||
|
|
||||||
|
|||||||
@@ -35,8 +35,8 @@ import java.util.stream.Collectors;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.AvroBinaryTestPayload;
|
import org.apache.hudi.common.model.AvroBinaryTestPayload;
|
||||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
@@ -47,19 +47,20 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||||
import org.apache.hudi.common.util.SpillableMapTestUtils;
|
import org.apache.hudi.common.util.SpillableMapTestUtils;
|
||||||
import org.apache.hudi.common.util.SpillableMapUtils;
|
import org.apache.hudi.common.util.SpillableMapUtils;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestDiskBasedMap {
|
public class TestDiskBasedMap extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static final String BASE_OUTPUT_PATH = "/tmp/";
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
initPath();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleInsert() throws IOException, URISyntaxException {
|
public void testSimpleInsert() throws IOException, URISyntaxException {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
DiskBasedMap records = new DiskBasedMap<>(basePath);
|
||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
|
||||||
|
|
||||||
DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH);
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||||
@@ -77,10 +78,7 @@ public class TestDiskBasedMap {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
|
public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
|
||||||
Schema schema = getSimpleSchema();
|
DiskBasedMap records = new DiskBasedMap<>(basePath);
|
||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
|
||||||
|
|
||||||
DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH);
|
|
||||||
List<HoodieRecord> hoodieRecords = SchemaTestUtil
|
List<HoodieRecord> hoodieRecords = SchemaTestUtil
|
||||||
.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
|
.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
|
||||||
Set<String> recordKeys = new HashSet<>();
|
Set<String> recordKeys = new HashSet<>();
|
||||||
@@ -102,11 +100,9 @@ public class TestDiskBasedMap {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleUpsert() throws IOException, URISyntaxException {
|
public void testSimpleUpsert() throws IOException, URISyntaxException {
|
||||||
|
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
|
||||||
|
|
||||||
DiskBasedMap records = new DiskBasedMap<>(BASE_OUTPUT_PATH);
|
DiskBasedMap records = new DiskBasedMap<>(basePath);
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
|
|
||||||
// perform some inserts
|
// perform some inserts
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
|
|||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
@@ -32,6 +31,7 @@ import java.util.List;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
@@ -43,23 +43,20 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||||
import org.apache.hudi.common.util.SpillableMapTestUtils;
|
import org.apache.hudi.common.util.SpillableMapTestUtils;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.Before;
|
||||||
import org.junit.FixMethodOrder;
|
import org.junit.FixMethodOrder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runners.MethodSorters;
|
import org.junit.runners.MethodSorters;
|
||||||
|
|
||||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||||
public class TestExternalSpillableMap {
|
public class TestExternalSpillableMap extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static final String FAILURE_OUTPUT_PATH = "/tmp/test_fail";
|
private static String failureOutputPath;
|
||||||
private static final String BASE_OUTPUT_PATH = "/tmp/";
|
|
||||||
|
|
||||||
@BeforeClass
|
@Before
|
||||||
public static void cleanUp() {
|
public void setUp() {
|
||||||
File file = new File(BASE_OUTPUT_PATH);
|
initPath();
|
||||||
file.delete();
|
failureOutputPath = basePath + "/test_fail";
|
||||||
file = new File(FAILURE_OUTPUT_PATH);
|
|
||||||
file.delete();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -67,7 +64,7 @@ public class TestExternalSpillableMap {
|
|||||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
|
new ExternalSpillableMap<>(16L, basePath,
|
||||||
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
@@ -89,7 +86,7 @@ public class TestExternalSpillableMap {
|
|||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
|
new ExternalSpillableMap<>(16L, basePath,
|
||||||
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
@@ -127,7 +124,7 @@ public class TestExternalSpillableMap {
|
|||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
|
new ExternalSpillableMap<>(16L, basePath,
|
||||||
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
@@ -180,10 +177,9 @@ public class TestExternalSpillableMap {
|
|||||||
@Test(expected = IOException.class)
|
@Test(expected = IOException.class)
|
||||||
public void simpleTestWithException() throws IOException, URISyntaxException {
|
public void simpleTestWithException() throws IOException, URISyntaxException {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, FAILURE_OUTPUT_PATH,
|
new ExternalSpillableMap<>(16L, failureOutputPath,
|
||||||
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
||||||
|
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
@@ -202,7 +198,7 @@ public class TestExternalSpillableMap {
|
|||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
|
new ExternalSpillableMap<>(16L, basePath,
|
||||||
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
||||||
|
|
||||||
List<String> recordKeys = new ArrayList<>();
|
List<String> recordKeys = new ArrayList<>();
|
||||||
@@ -255,7 +251,7 @@ public class TestExternalSpillableMap {
|
|||||||
String payloadClazz = HoodieAvroPayload.class.getName();
|
String payloadClazz = HoodieAvroPayload.class.getName();
|
||||||
|
|
||||||
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records =
|
||||||
new ExternalSpillableMap<>(16L, BASE_OUTPUT_PATH,
|
new ExternalSpillableMap<>(16L, basePath,
|
||||||
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema)); //16B
|
||||||
|
|
||||||
List<String> recordKeys = new ArrayList<>();
|
List<String> recordKeys = new ArrayList<>();
|
||||||
|
|||||||
@@ -25,20 +25,25 @@ import java.util.Iterator;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||||
import org.apache.hudi.common.util.SpillableMapTestUtils;
|
import org.apache.hudi.common.util.SpillableMapTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestRocksDbBasedMap {
|
public class TestRocksDbBasedMap extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static final String BASE_OUTPUT_PATH = "/tmp/";
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
initPath();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimple() throws IOException, URISyntaxException {
|
public void testSimple() throws IOException, URISyntaxException {
|
||||||
RocksDBBasedMap records = new RocksDBBasedMap(BASE_OUTPUT_PATH);
|
RocksDBBasedMap records = new RocksDBBasedMap(basePath);
|
||||||
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
||||||
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||||
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
|
||||||
|
|||||||
@@ -25,26 +25,21 @@ import java.io.File;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class TestHoodieROTablePathFilter {
|
public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
@Rule
|
|
||||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
|
||||||
|
|
||||||
private HoodieTableMetaClient metaClient;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
this.metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
|
initMetaClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -105,5 +100,7 @@ public class TestHoodieROTablePathFilter {
|
|||||||
path = basePath + File.separator + "nonhoodiefolder/somefile";
|
path = basePath + File.separator + "nonhoodiefolder/somefile";
|
||||||
new File(path).createNewFile();
|
new File(path).createNewFile();
|
||||||
assertTrue(pathFilter.accept(new Path("file:///" + path)));
|
assertTrue(pathFilter.accept(new Path("file:///" + path)));
|
||||||
|
|
||||||
|
folder.delete();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import java.io.IOException;
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.util.FSUtils;
|
import org.apache.hudi.common.util.FSUtils;
|
||||||
@@ -35,9 +36,8 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
|
|
||||||
public class TestHoodieSnapshotCopier {
|
public class TestHoodieSnapshotCopier extends HoodieCommonTestHarness {
|
||||||
|
|
||||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||||
|
|
||||||
@@ -49,23 +49,17 @@ public class TestHoodieSnapshotCopier {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
try {
|
// Prepare directories
|
||||||
// Prepare directories
|
rootPath = "file://" + folder.getRoot().getAbsolutePath();
|
||||||
TemporaryFolder folder = new TemporaryFolder();
|
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
|
||||||
folder.create();
|
outputPath = rootPath + "/output";
|
||||||
rootPath = "file://" + folder.getRoot().getAbsolutePath();
|
|
||||||
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
|
|
||||||
outputPath = rootPath + "/output";
|
|
||||||
|
|
||||||
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||||
fs = FSUtils.getFs(basePath, hadoopConf);
|
fs = FSUtils.getFs(basePath, hadoopConf);
|
||||||
HoodieTestUtils.init(hadoopConf, basePath);
|
HoodieTestUtils.init(hadoopConf, basePath);
|
||||||
// Start a local Spark job
|
// Start a local Spark job
|
||||||
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
|
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
|
||||||
jsc = new JavaSparkContext(conf);
|
jsc = new JavaSparkContext(conf);
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user