[HUDI-247] Unify the re-initialization of HoodieTableMetaClient in test for hoodie-client module (#930)
This commit is contained in:
@@ -55,6 +55,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
protected TemporaryFolder folder = null;
|
||||
protected transient HoodieTestDataGenerator dataGen = null;
|
||||
protected transient ExecutorService executorService;
|
||||
protected transient HoodieTableMetaClient metaClient;
|
||||
|
||||
//dfs
|
||||
protected String dfsBasePath;
|
||||
@@ -72,7 +73,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
initSparkContexts();
|
||||
initTestDataGenerator();
|
||||
initFileSystem();
|
||||
initTableType();
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -80,7 +81,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
* @throws IOException
|
||||
*/
|
||||
public void cleanupResources() throws IOException {
|
||||
cleanupTableType();
|
||||
cleanupMetaClient();
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupFileSystem();
|
||||
@@ -191,7 +192,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initTableType() throws IOException {
|
||||
protected void initMetaClient() throws IOException {
|
||||
if (basePath == null) {
|
||||
throw new IllegalStateException("The base path has not been initialized.");
|
||||
}
|
||||
@@ -200,14 +201,14 @@ public abstract class HoodieClientTestHarness implements Serializable {
|
||||
throw new IllegalStateException("The Spark context has not been initialized.");
|
||||
}
|
||||
|
||||
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
|
||||
metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanups table type.
|
||||
*/
|
||||
protected void cleanupTableType() {
|
||||
|
||||
protected void cleanupMetaClient() {
|
||||
metaClient = null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -114,7 +114,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
// verify that there is a commit
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
|
||||
// Should have 100 records in table (check using Index), all in locations marked at commit
|
||||
@@ -200,8 +200,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
|
||||
|
||||
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
TableFileSystemView fsView = table.getFileSystemView();
|
||||
Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst()
|
||||
@@ -239,8 +239,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
||||
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
|
||||
|
||||
TableFileSystemView fsView = table.getFileSystemView();
|
||||
@@ -375,8 +375,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
|
||||
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
|
||||
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
|
||||
@@ -424,9 +424,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
|
||||
String file1P1C0 = HoodieTestUtils
|
||||
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
|
||||
assertEquals("Must not clean any files", 0,
|
||||
@@ -442,8 +441,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "001");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
String file2P0C1 = HoodieTestUtils
|
||||
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
|
||||
@@ -472,8 +471,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createCommitFiles(basePath, "002");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
|
||||
@@ -578,9 +577,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
String file1P1C0 = HoodieTestUtils
|
||||
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
|
||||
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
|
||||
assertEquals("Must not clean any files", 0,
|
||||
@@ -596,8 +594,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "001");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
String file2P0C1 = HoodieTestUtils
|
||||
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
|
||||
@@ -626,8 +624,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createCommitFiles(basePath, "002");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
|
||||
@@ -646,8 +644,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createCommitFiles(basePath, "003");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
|
||||
@@ -700,9 +698,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles());
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
table.rollback(jsc, "000", true);
|
||||
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
|
||||
@@ -722,9 +719,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
// with just some commit metadata, but no data/partitionPaths.
|
||||
HoodieTestUtils.createCommitFiles(basePath, "000");
|
||||
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
|
||||
assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty());
|
||||
@@ -783,9 +779,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003");
|
||||
updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003");
|
||||
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
|
||||
|
||||
assertEquals(100,
|
||||
@@ -890,9 +885,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
for (int j = 1; j <= i; j++) {
|
||||
if (j == i && j <= maxNumFileIdsForCompaction) {
|
||||
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
FileSlice slice = table.getRTFileSystemView().getLatestFileSlices(
|
||||
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|
||||
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
|
||||
@@ -934,15 +928,13 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
}
|
||||
|
||||
// Clean now
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
|
||||
|
||||
// Test for safety
|
||||
final HoodieTable hoodieTable = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
|
||||
jsc);
|
||||
final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> {
|
||||
String fileId = entry.getKey();
|
||||
@@ -961,7 +953,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
// Test for progress (Did we clean some files ?)
|
||||
long numFilesUnderCompactionDeleted =
|
||||
hoodieCleanStats.stream().flatMap(cleanStat -> {
|
||||
return convertPathToFileIdWithCommitTime(metaClient, cleanStat.getDeletePathPatterns()).map(
|
||||
return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()).map(
|
||||
fileIdWithCommitTime -> {
|
||||
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
|
||||
Assert.assertTrue("Deleted instant time must be less than pending compaction",
|
||||
|
||||
@@ -97,7 +97,7 @@ public class TestClientRollback extends TestHoodieClientBase {
|
||||
assertNoWriteErrors(statuses);
|
||||
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(),
|
||||
getConfig().shouldAssumeDatePartitioning());
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
||||
final ReadOptimizedView view1 = table.getROFileSystemView();
|
||||
|
||||
@@ -122,7 +122,7 @@ public class TestClientRollback extends TestHoodieClientBase {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
||||
final ReadOptimizedView view2 = table.getROFileSystemView();
|
||||
|
||||
@@ -143,7 +143,7 @@ public class TestClientRollback extends TestHoodieClientBase {
|
||||
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
|
||||
client.rollbackToSavepoint(savepoint.getTimestamp());
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
||||
final ReadOptimizedView view3 = table.getROFileSystemView();
|
||||
dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||
|
||||
@@ -53,14 +53,14 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
|
||||
initTempFolderAndPath();
|
||||
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
|
||||
// Initialize table
|
||||
initTableType();
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupTempFolderAndPath();
|
||||
cleanupTableType();
|
||||
cleanupMetaClient();
|
||||
if (utility != null) {
|
||||
utility.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@@ -40,7 +40,6 @@ import org.apache.hudi.HoodieWriteClient;
|
||||
import org.apache.hudi.WriteStatus;
|
||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
@@ -104,9 +103,8 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
|
||||
// Create a temp folder as the base path
|
||||
initTempFolderAndPath();
|
||||
// Initialize table
|
||||
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
|
||||
initTestDataGenerator();
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
@After
|
||||
@@ -114,6 +112,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
cleanupSparkContexts();
|
||||
cleanupTempFolderAndPath();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupMetaClient();
|
||||
}
|
||||
|
||||
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
|
||||
@@ -132,7 +131,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
writeClient.startCommit();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Test tagLocation without any entries in index
|
||||
@@ -151,7 +150,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
// Now commit this & update location of records inserted and validate no errors
|
||||
writeClient.commit(newCommitTime, writeStatues);
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200);
|
||||
@@ -173,7 +172,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
|
||||
writeClient.startCommit();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||
@@ -185,7 +184,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
// Now commit this & update location of records inserted and validate no errors
|
||||
writeClient.commit(newCommitTime, writeStatues);
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 10);
|
||||
@@ -205,7 +204,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Insert 200 records
|
||||
@@ -257,7 +256,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Insert 250 records
|
||||
@@ -282,7 +281,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Insert 200 records
|
||||
|
||||
@@ -36,14 +36,14 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
public void setUp() throws Exception {
|
||||
initSparkContexts("TestHoodieIndex");
|
||||
initTempFolderAndPath();
|
||||
initTableType();
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupTempFolderAndPath();
|
||||
cleanupTableType();
|
||||
cleanupMetaClient();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -44,7 +44,6 @@ import org.apache.hudi.common.HoodieClientTestUtils;
|
||||
import org.apache.hudi.common.TestRawTripPayload;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
@@ -92,10 +91,10 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
initSparkContexts("TestHoodieBloomIndex");
|
||||
initTempFolderAndPath();
|
||||
initFileSystem();
|
||||
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
|
||||
// We have some records to be tagged (two different partitions)
|
||||
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
@After
|
||||
@@ -103,6 +102,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
cleanupSparkContexts();
|
||||
cleanupFileSystem();
|
||||
cleanupTempFolderAndPath();
|
||||
cleanupMetaClient();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig makeConfig() {
|
||||
@@ -163,8 +163,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
false);
|
||||
|
||||
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||
// Still 0, as no valid commit
|
||||
assertEquals(filesList.size(), 0);
|
||||
@@ -174,7 +174,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
|
||||
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
|
||||
|
||||
table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||
assertEquals(filesList.size(), 4);
|
||||
|
||||
@@ -286,9 +286,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
// We have some records to be tagged (two different partitions)
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
|
||||
// Also create the metadata and config
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieWriteConfig config = makeConfig();
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Let's tag
|
||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||
@@ -331,9 +331,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));
|
||||
|
||||
// Also create the metadata and config
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieWriteConfig config = makeConfig();
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Let's tag
|
||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||
@@ -353,8 +353,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
|
||||
|
||||
// We do the tag again
|
||||
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
|
||||
|
||||
@@ -401,9 +401,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
JavaRDD<HoodieKey> keysRDD = jsc.parallelize(Arrays.asList(key1, key2, key3, key4));
|
||||
|
||||
// Also create the metadata and config
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieWriteConfig config = makeConfig();
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Let's tag
|
||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||
@@ -424,8 +424,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
|
||||
|
||||
// We do the tag again
|
||||
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
|
||||
|
||||
// Check results
|
||||
@@ -473,9 +473,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
|
||||
// We do the tag
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieWriteConfig config = makeConfig();
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||
JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
|
||||
|
||||
@@ -40,7 +40,6 @@ import org.apache.hudi.common.TestRawTripPayload;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
@@ -66,16 +65,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
public void setUp() throws Exception {
|
||||
initSparkContexts("TestHoodieGlobalBloomIndex");
|
||||
initTempFolderAndPath();
|
||||
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
|
||||
// We have some records to be tagged (two different partitions)
|
||||
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
|
||||
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupTempFolderAndPath();
|
||||
cleanupMetaClient();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -127,8 +127,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
|
||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
// partitions will NOT be respected by this loadInvolvedFiles(...) call
|
||||
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||
// Still 0, as no valid commit
|
||||
@@ -139,7 +139,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
|
||||
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
|
||||
|
||||
table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||
assertEquals(filesList.size(), 4);
|
||||
|
||||
@@ -264,8 +264,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false);
|
||||
|
||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
|
||||
// Add some commits
|
||||
|
||||
@@ -54,6 +54,7 @@ import org.junit.Test;
|
||||
public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
|
||||
private Configuration hadoopConf;
|
||||
private HoodieTableMetaClient metaClient;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
@@ -63,7 +64,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
hadoopConf = dfs.getConf();
|
||||
jsc.hadoopConfiguration().addResource(dfs.getConf());
|
||||
dfs.mkdirs(new Path(basePath));
|
||||
HoodieTestUtils.init(hadoopConf, basePath);
|
||||
metaClient = HoodieTestUtils.init(hadoopConf, basePath);
|
||||
}
|
||||
|
||||
@After
|
||||
@@ -78,8 +79,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").build();
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
|
||||
new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true));
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
boolean result = archiveLog.archiveIfRequired(jsc);
|
||||
assertTrue(result);
|
||||
}
|
||||
@@ -135,7 +136,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105"), dfs.getConf());
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
|
||||
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
|
||||
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
|
||||
@@ -158,8 +159,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
// verify in-flight instants before archive
|
||||
verifyInflightInstants(metaClient, 3);
|
||||
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
|
||||
new HoodieTableMetaClient(dfs.getConf(), basePath, true));
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
|
||||
assertTrue(archiveLog.archiveIfRequired(jsc));
|
||||
|
||||
@@ -235,7 +236,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
// Requested Compaction
|
||||
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
|
||||
@@ -302,7 +303,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
|
||||
@@ -328,7 +329,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
|
||||
@@ -360,7 +361,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
.forTable("test-trip-table").withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
|
||||
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
|
||||
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
|
||||
|
||||
@@ -52,6 +52,7 @@ import org.junit.Test;
|
||||
public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
|
||||
private Configuration hadoopConf;
|
||||
private HoodieTableMetaClient metaClient;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@@ -62,7 +63,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
initTempFolderAndPath();
|
||||
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||
fs = FSUtils.getFs(basePath, hadoopConf);
|
||||
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
||||
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
||||
initTestDataGenerator();
|
||||
}
|
||||
|
||||
@@ -96,9 +97,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
|
||||
@Test(expected = HoodieNotSupportedException.class)
|
||||
public void testCompactionOnCopyOnWriteFail() throws Exception {
|
||||
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
|
||||
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
|
||||
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
|
||||
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
|
||||
@@ -106,8 +105,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCompactionEmpty() throws Exception {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieWriteConfig config = getConfig();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
|
||||
@@ -136,7 +135,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
List<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
|
||||
|
||||
// Update all the 100 records
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
newCommitTime = "101";
|
||||
@@ -153,7 +152,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
updatedRecords);
|
||||
|
||||
// Verify that all data file has one log file
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
List<FileSlice> groupedLogFiles = table.getRTFileSystemView().getLatestFileSlices(partitionPath)
|
||||
@@ -164,7 +163,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
// Do a compaction
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
|
||||
|
||||
@@ -58,17 +58,17 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
initSparkContexts("TestHoodieMergeHandle");
|
||||
initTempFolderAndPath();
|
||||
initFileSystem();
|
||||
initTableType();
|
||||
initTestDataGenerator();
|
||||
initMetaClient();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
cleanupTableType();
|
||||
cleanupFileSystem();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupTempFolderAndPath();
|
||||
cleanupSparkContexts();
|
||||
cleanupMetaClient();
|
||||
}
|
||||
|
||||
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
|
||||
@@ -109,7 +109,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
// verify that there is a commit
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||
assertEquals("Expecting a single commit.", 1,
|
||||
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
|
||||
@@ -137,7 +137,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
// verify that there are 2 commits
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||
assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
||||
.countInstants());
|
||||
@@ -161,7 +161,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
// verify that there are now 3 commits
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||
assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
|
||||
.countInstants());
|
||||
@@ -259,7 +259,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
.map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get(), 100);
|
||||
|
||||
// Update all the 100 records
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
newCommitTime = "101";
|
||||
|
||||
@@ -73,7 +73,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
public void setUp() throws Exception {
|
||||
initSparkContexts("TestCopyOnWriteTable");
|
||||
initTempFolderAndPath();
|
||||
initTableType();
|
||||
initMetaClient();
|
||||
initTestDataGenerator();
|
||||
initFileSystem();
|
||||
}
|
||||
@@ -82,7 +82,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupTempFolderAndPath();
|
||||
cleanupTableType();
|
||||
cleanupMetaClient();
|
||||
cleanupFileSystem();
|
||||
cleanupTestDataGenerator();
|
||||
}
|
||||
@@ -94,7 +94,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
|
||||
String commitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
HoodieWriteConfig config = makeHoodieClientConfig();
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
|
||||
@@ -127,7 +127,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
// Prepare the AvroParquetIO
|
||||
HoodieWriteConfig config = makeHoodieClientConfig();
|
||||
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
String partitionPath = "/2016/01/31";
|
||||
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
|
||||
@@ -203,7 +203,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
|
||||
Thread.sleep(1000);
|
||||
String newCommitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc);
|
||||
List<WriteStatus> statuses =
|
||||
jsc.parallelize(Arrays.asList(1)).map(x -> {
|
||||
@@ -271,7 +271,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class)
|
||||
.build();
|
||||
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
|
||||
|
||||
@@ -308,8 +308,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
public void testInsertRecords() throws Exception {
|
||||
HoodieWriteConfig config = makeHoodieClientConfig();
|
||||
String commitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
// Case 1:
|
||||
// 10 records for partition 1, 1 record for partition 2.
|
||||
@@ -362,7 +362,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
HoodieStorageConfig.newBuilder().limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024)
|
||||
.build()).build();
|
||||
String commitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
|
||||
|
||||
List<HoodieRecord> records = new ArrayList<>();
|
||||
@@ -401,8 +401,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
|
||||
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
|
||||
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
|
||||
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath});
|
||||
@@ -476,7 +475,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
|
||||
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(
|
||||
HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
|
||||
String commitTime = "000";
|
||||
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
|
||||
@@ -487,7 +486,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
|
||||
|
||||
WriteStatus writeStatus = ws.get(0).get(0);
|
||||
String fileId = writeStatus.getFileId();
|
||||
metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
|
||||
metaClient.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
|
||||
final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc);
|
||||
|
||||
final List<HoodieRecord> updates =
|
||||
|
||||
@@ -156,7 +156,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
|
||||
@@ -173,7 +173,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
assertTrue(dataFilesToRead.findAny().isPresent());
|
||||
|
||||
// verify that there is a commit
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
|
||||
assertEquals("Expecting a single commit.", 1,
|
||||
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
|
||||
@@ -270,7 +270,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
|
||||
@@ -335,7 +335,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
//rollback a COW commit when TableType is MOR
|
||||
client.rollback(newCommitTime);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient,
|
||||
@@ -454,7 +454,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
|
||||
.contains(commitTime2)).collect(Collectors.toList()).size(), 0);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||
@@ -477,14 +477,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
|
||||
JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
|
||||
thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
|
||||
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
|
||||
List<HoodieDataFile> dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList());
|
||||
@@ -504,7 +504,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
thirdClient.rollback(compactedCommitTime);
|
||||
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
|
||||
|
||||
@@ -603,7 +603,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
String compactionInstantTime = "004";
|
||||
allCommits.add(compactionInstantTime);
|
||||
@@ -626,7 +626,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
compactionInstantTime = "006";
|
||||
allCommits.add(compactionInstantTime);
|
||||
@@ -635,7 +635,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
client.commitCompaction(compactionInstantTime, ws, Option.empty());
|
||||
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
|
||||
|
||||
final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant()
|
||||
@@ -669,7 +669,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Rollback latest commit first
|
||||
client.restoreToInstant("000");
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
roView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
|
||||
@@ -754,7 +754,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp());
|
||||
@@ -811,7 +811,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
HoodieTestDataGenerator.avroSchemaWithMetadataFields, updatedRecords);
|
||||
|
||||
// Verify that all data file has one log file
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
|
||||
((SyncableFileSystemView) (table.getRTFileSystemView())).reset();
|
||||
@@ -833,7 +833,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
|
||||
|
||||
// Verify that recently written compacted data file has no log file
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
|
||||
|
||||
@@ -949,7 +949,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
.copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath()));
|
||||
writeClient.rollback(newCommitTime);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
private String basePath;
|
||||
private transient HoodieWrapperFileSystem fs;
|
||||
private String metaPath;
|
||||
private boolean loadActiveTimelineOnLoad;
|
||||
private SerializableConfiguration hadoopConf;
|
||||
private HoodieTableType tableType;
|
||||
private HoodieTableConfig tableConfig;
|
||||
@@ -104,6 +105,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
this.tableConfig = new HoodieTableConfig(fs, metaPath);
|
||||
this.tableType = tableConfig.getTableType();
|
||||
log.info("Finished Loading Table of type " + tableType + " from " + basePath);
|
||||
this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
|
||||
if (loadActiveTimelineOnLoad) {
|
||||
log.info("Loading Active commit timeline for " + basePath);
|
||||
getActiveTimeline();
|
||||
@@ -118,6 +120,14 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
public HoodieTableMetaClient() {
|
||||
}
|
||||
|
||||
public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
|
||||
return new HoodieTableMetaClient(
|
||||
oldMetaClient.hadoopConf.get(),
|
||||
oldMetaClient.basePath,
|
||||
oldMetaClient.loadActiveTimelineOnLoad,
|
||||
oldMetaClient.consistencyGuardConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is only used when this object is deserialized in a spark executor.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user