1
0

[HUDI-886] Replace jsc.hadoopConfiguration by hadoop configuration in hudi-client testcase (#1621)

This commit is contained in:
Shen Hong
2020-05-12 23:51:31 +08:00
committed by GitHub
parent 295d00beea
commit b54517aad0
12 changed files with 78 additions and 78 deletions

View File

@@ -249,7 +249,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
return (commit, numRecords) -> { return (commit, numRecords) -> {
final HoodieIndex index = HoodieIndex.createIndex(writeConfig); final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords); List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf); HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf);
JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
return taggedRecords.collect(); return taggedRecords.collect();
@@ -270,7 +270,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
return (numRecords) -> { return (numRecords) -> {
final HoodieIndex index = HoodieIndex.createIndex(writeConfig); final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
List<HoodieKey> records = keyGenFunction.apply(numRecords); List<HoodieKey> records = keyGenFunction.apply(numRecords);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf); HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf);
JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1) JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
@@ -467,7 +467,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
assertPartitionMetadataForRecords(records, fs); assertPartitionMetadataForRecords(records, fs);
// verify that there is a commit // verify that there is a commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) { if (assertForCommit) {
@@ -535,7 +535,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
assertPartitionMetadataForKeys(keysToDelete, fs); assertPartitionMetadataForKeys(keysToDelete, fs);
// verify that there is a commit // verify that there is a commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) { if (assertForCommit) {

View File

@@ -478,7 +478,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added."); assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId(); String file1 = statuses.get(0).getFileId();
assertEquals(100, assertEquals(100,
readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath())) readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records"); .size(), "file should contain 100 records");
// Update + Inserts such that they just expand file1 // Update + Inserts such that they just expand file1
@@ -498,10 +498,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(140, readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 140 records"); "file should contain 140 records");
List<GenericRecord> records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) { for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect commit2"); assertEquals(commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect commit2");
@@ -521,7 +521,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed."); assertEquals(2, statuses.size(), "2 files needs to be committed.");
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTable table = getHoodieTable(metadata, config); HoodieTable table = getHoodieTable(metadata, config);
BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView(); BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
@@ -532,7 +532,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
for (HoodieBaseFile file : files) { for (HoodieBaseFile file : files) {
if (file.getFileName().contains(file1)) { if (file.getFileName().contains(file1)) {
assertEquals(commitTime3, file.getCommitTime(), "Existing file should be expanded"); assertEquals(commitTime3, file.getCommitTime(), "Existing file should be expanded");
records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
for (GenericRecord record : records) { for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
@@ -548,7 +548,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals(0, keys2.size(), "All keys added in commit 2 must be updated in commit3 correctly"); assertEquals(0, keys2.size(), "All keys added in commit 2 must be updated in commit3 correctly");
} else { } else {
assertEquals(commitTime3, file.getCommitTime(), "New file must be written for commit 3"); assertEquals(commitTime3, file.getCommitTime(), "New file must be written for commit 3");
records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
for (GenericRecord record : records) { for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime3, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), assertEquals(commitTime3, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
@@ -589,7 +589,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added."); assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId(); String file1 = statuses.get(0).getFileId();
assertEquals(100, assertEquals(100,
readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath())) readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records"); .size(), "file should contain 100 records");
// Second, set of Inserts should just expand file1 // Second, set of Inserts should just expand file1
@@ -605,10 +605,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(140, readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 140 records"); "file should contain 140 records");
List<GenericRecord> records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) { for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
@@ -627,7 +627,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed."); assertEquals(2, statuses.size(), "2 files needs to be committed.");
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTable table = getHoodieTable(metaClient, config); HoodieTable table = getHoodieTable(metaClient, config);
List<HoodieBaseFile> files = table.getBaseFileOnlyView() List<HoodieBaseFile> files = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList()); .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
@@ -636,7 +636,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
int totalInserts = 0; int totalInserts = 0;
for (HoodieBaseFile file : files) { for (HoodieBaseFile file : files) {
assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3");
records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
totalInserts += records.size(); totalInserts += records.size();
} }
assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(), assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(),
@@ -670,7 +670,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added."); assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId(); String file1 = statuses.get(0).getFileId();
assertEquals(100, assertEquals(100,
readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath())) readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records"); .size(), "file should contain 100 records");
// Delete 20 among 100 inserted // Delete 20 among 100 inserted
@@ -763,10 +763,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(exepctedRecords, assertEquals(exepctedRecords,
readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 110 records"); "file should contain 110 records");
List<GenericRecord> records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) { for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime); assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime);
@@ -808,7 +808,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
String instantTime = "000"; String instantTime = "000";
@@ -855,7 +855,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg); HoodieWriteClient client = getHoodieWriteClient(cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
String instantTime = "000"; String instantTime = "000";
client.startCommitWithTime(instantTime); client.startCommitWithTime(instantTime);
@@ -926,7 +926,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
*/ */
@Test @Test
public void testConsistencyCheckDuringFinalize() throws Exception { public void testConsistencyCheckDuringFinalize() throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
String instantTime = "000"; String instantTime = "000";
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg); HoodieWriteClient client = getHoodieWriteClient(cfg);
@@ -944,7 +944,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
@Test @Test
public void testRollbackAfterConsistencyCheckFailure() throws Exception { public void testRollbackAfterConsistencyCheckFailure() throws Exception {
String instantTime = "000"; String instantTime = "000";
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg); HoodieWriteClient client = getHoodieWriteClient(cfg);
testConsistencyCheck(metaClient, instantTime); testConsistencyCheck(metaClient, instantTime);

View File

@@ -81,7 +81,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
@Test @Test
public void readLocalWriteHDFS() throws Exception { public void readLocalWriteHDFS() throws Exception {
// Initialize table and filesystem // Initialize table and filesystem
HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.valueOf(tableType), HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName()); tableName, HoodieAvroPayload.class.getName());
// Create write client to write some records in // Create write client to write some records in
@@ -106,7 +106,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
assertEquals(readRecords.count(), records.size(), "Should contain 100 records"); assertEquals(readRecords.count(), records.size(), "Should contain 100 records");
// Write to local // Write to local
HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName()); tableName, HoodieAvroPayload.class.getName());
String writeCommitTime = localWriteClient.startCommit(); String writeCommitTime = localWriteClient.startCommit();

View File

@@ -144,7 +144,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
throw new IllegalStateException("The Spark context has not been initialized."); throw new IllegalStateException("The Spark context has not been initialized.");
} }
initFileSystemWithConfiguration(jsc.hadoopConfiguration()); initFileSystemWithConfiguration(hadoopConf);
} }
/** /**
@@ -181,7 +181,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
throw new IllegalStateException("The Spark context has not been initialized."); throw new IllegalStateException("The Spark context has not been initialized.");
} }
metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType());
} }
/** /**

View File

@@ -112,7 +112,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
public void setUp() throws Exception { public void setUp() throws Exception {
// Initialize a local spark env // Initialize a local spark env
initSparkContexts("TestHbaseIndex"); initSparkContexts("TestHbaseIndex");
jsc.hadoopConfiguration().addResource(utility.getConfiguration()); hadoopConf.addResource(utility.getConfiguration());
// Create a temp folder as the base path // Create a temp folder as the base path
initPath(); initPath();

View File

@@ -259,7 +259,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
List<String> uuids = List<String> uuids =
Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey()); Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
List<String> results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(jsc.hadoopConfiguration(), uuids, List<String> results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
new Path(basePath + "/2016/01/31/" + filename)); new Path(basePath + "/2016/01/31/" + filename));
assertEquals(results.size(), 2); assertEquals(results.size(), 2);
assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0") assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")

View File

@@ -62,7 +62,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
initPath(); initPath();
initSparkContexts("TestHoodieCommitArchiveLog"); initSparkContexts("TestHoodieCommitArchiveLog");
hadoopConf = dfs.getConf(); hadoopConf = dfs.getConf();
jsc.hadoopConfiguration().addResource(dfs.getConf()); hadoopConf.addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath)); dfs.mkdirs(new Path(basePath));
metaClient = HoodieTestUtils.init(hadoopConf, basePath); metaClient = HoodieTestUtils.init(hadoopConf, basePath);
} }

View File

@@ -85,7 +85,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
// Build a write config with bulkinsertparallelism set // Build a write config with bulkinsertparallelism set
HoodieWriteConfig cfg = getConfigBuilder().build(); HoodieWriteConfig cfg = getConfigBuilder().build();
try (HoodieWriteClient client = getWriteClient(cfg);) { try (HoodieWriteClient client = getWriteClient(cfg);) {
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
/** /**
* Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times

View File

@@ -567,7 +567,7 @@ public class TestCleaner extends TestHoodieClientBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build(); .build();
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
// Make 3 files, one base file and 2 log files associated with base file // Make 3 files, one base file and 2 log files associated with base file
String file1P0 = String file1P0 =
@@ -1010,7 +1010,7 @@ public class TestCleaner extends TestHoodieClientBase {
private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted,
int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws IOException { int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws IOException {
HoodieTableMetaClient metaClient = HoodieTableMetaClient metaClient =
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"}; String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"};
String[] compactionInstants = new String[] {"002", "004", "006", "008", "010"}; String[] compactionInstants = new String[] {"002", "004", "006", "008", "010"};
Map<String, String> expFileIdToPendingCompaction = new HashMap<>(); Map<String, String> expFileIdToPendingCompaction = new HashMap<>();

View File

@@ -100,19 +100,19 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
public void init() throws IOException { public void init() throws IOException {
initDFS(); initDFS();
initSparkContexts("TestHoodieMergeOnReadTable"); initSparkContexts("TestHoodieMergeOnReadTable");
jsc.hadoopConfiguration().addResource(dfs.getConf()); hadoopConf.addResource(dfs.getConf());
initPath(); initPath();
dfs.mkdirs(new Path(basePath)); dfs.mkdirs(new Path(basePath));
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
initTestDataGenerator(); initTestDataGenerator();
// initialize parquet input format // initialize parquet input format
roInputFormat = new HoodieParquetInputFormat(); roInputFormat = new HoodieParquetInputFormat();
roJobConf = new JobConf(jsc.hadoopConfiguration()); roJobConf = new JobConf(hadoopConf);
roInputFormat.setConf(roJobConf); roInputFormat.setConf(roJobConf);
rtInputFormat = new HoodieParquetRealtimeInputFormat(); rtInputFormat = new HoodieParquetRealtimeInputFormat();
rtJobConf = new JobConf(jsc.hadoopConfiguration()); rtJobConf = new JobConf(hadoopConf);
rtInputFormat.setConf(rtJobConf); rtInputFormat.setConf(rtJobConf);
} }
@@ -307,7 +307,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -375,7 +375,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
public void testCOWToMORConvertedTableRollback() throws Exception { public void testCOWToMORConvertedTableRollback() throws Exception {
// Set TableType to COW // Set TableType to COW
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getWriteClient(cfg);) { try (HoodieWriteClient client = getWriteClient(cfg);) {
@@ -393,7 +393,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// verify there are no errors // verify there are no errors
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertTrue(commit.isPresent()); assertTrue(commit.isPresent());
assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); assertEquals("001", commit.get().getTimestamp(), "commit should be 001");
@@ -411,7 +411,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
// Set TableType to MOR // Set TableType to MOR
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
// rollback a COW commit when TableType is MOR // rollback a COW commit when TableType is MOR
client.rollback(newCommitTime); client.rollback(newCommitTime);
@@ -448,7 +448,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect(); List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -595,7 +595,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect(); List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -758,7 +758,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -832,7 +832,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
writeClient.insert(recordsRDD, newCommitTime).collect(); writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records // Update all the 100 records
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
newCommitTime = "101"; newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -907,7 +907,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
writeClient.commit(newCommitTime, statuses); writeClient.commit(newCommitTime, statuses);
HoodieTable table = HoodieTable table =
HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView(); SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0; long numLogFiles = 0;
@@ -968,7 +968,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
// and calling rollback twice // and calling rollback twice
final String lastCommitTime = newCommitTime; final String lastCommitTime = newCommitTime;
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieInstant last = metaClient.getCommitsTimeline().getInstants() HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
.filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get(); .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
String fileName = last.getFileName(); String fileName = last.getFileName();
@@ -1017,7 +1017,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
statuses.collect(); statuses.collect();
HoodieTable table = HoodieTable table =
HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView(); SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0; long numLogFiles = 0;
@@ -1038,7 +1038,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction // Trigger a rollback of compaction
writeClient.rollback(newCommitTime); writeClient.rollback(newCommitTime);
table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); table = HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf);
tableRTFileSystemView = table.getSliceView(); tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset(); ((SyncableFileSystemView) tableRTFileSystemView).reset();
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant(); Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
@@ -1058,7 +1058,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getWriteClient(cfg);) { try (HoodieWriteClient client = getWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
// Create a commit without rolling stats in metadata to test backwards compatibility // Create a commit without rolling stats in metadata to test backwards compatibility
@@ -1157,7 +1157,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
public void testRollingStatsWithSmallFileHandling() throws Exception { public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getWriteClient(cfg);) { try (HoodieWriteClient client = getWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
Map<String, Long> fileIdToInsertsMap = new HashMap<>(); Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>(); Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -1304,7 +1304,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf); HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -1397,7 +1397,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect(); List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
@@ -1450,7 +1450,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction) private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction)
throws Exception { throws Exception {
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction); setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction);
FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath, partitionPath).toString()); FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath, partitionPath).toString());
return roInputFormat.listStatus(roJobConf); return roInputFormat.listStatus(roJobConf);
@@ -1463,7 +1463,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull)
throws Exception { throws Exception {
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false); setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false);
FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath, partitionPath).toString()); FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath, partitionPath).toString());
return rtInputFormat.listStatus(rtJobConf); return rtInputFormat.listStatus(rtJobConf);

View File

@@ -163,13 +163,13 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
// Read out the bloom filter and make sure filter can answer record exist or not // Read out the bloom filter and make sure filter can answer record exist or not
Path parquetFilePath = allFiles[0].getPath(); Path parquetFilePath = allFiles[0].getPath();
BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath); BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath);
for (HoodieRecord record : records) { for (HoodieRecord record : records) {
assertTrue(filter.mightContain(record.getRecordKey())); assertTrue(filter.mightContain(record.getRecordKey()));
} }
// Read the parquet file, check the record content // Read the parquet file, check the record content
List<GenericRecord> fileRecords = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), parquetFilePath); List<GenericRecord> fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath);
GenericRecord newRecord; GenericRecord newRecord;
int index = 0; int index = 0;
for (GenericRecord record : fileRecords) { for (GenericRecord record : fileRecords) {
@@ -205,7 +205,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
// Check whether the record has been updated // Check whether the record has been updated
Path updatedParquetFilePath = allFiles[0].getPath(); Path updatedParquetFilePath = allFiles[0].getPath();
BloomFilter updatedFilter = BloomFilter updatedFilter =
ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), updatedParquetFilePath); ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath);
for (HoodieRecord record : records) { for (HoodieRecord record : records) {
// No change to the _row_key // No change to the _row_key
assertTrue(updatedFilter.mightContain(record.getRecordKey())); assertTrue(updatedFilter.mightContain(record.getRecordKey()));
@@ -234,9 +234,9 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
throws Exception { throws Exception {
// initialize parquet input format // initialize parquet input format
HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat(); HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat();
JobConf jobConf = new JobConf(jsc.hadoopConfiguration()); JobConf jobConf = new JobConf(hadoopConf);
hoodieInputFormat.setConf(jobConf); hoodieInputFormat.setConf(jobConf);
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
setupIncremental(jobConf, startCommitTime, numCommitsToPull); setupIncremental(jobConf, startCommitTime, numCommitsToPull);
FileInputFormat.setInputPaths(jobConf, Paths.get(basePath, partitionPath).toString()); FileInputFormat.setInputPaths(jobConf, Paths.get(basePath, partitionPath).toString());
return hoodieInputFormat.listStatus(jobConf); return hoodieInputFormat.listStatus(jobConf);

View File

@@ -107,7 +107,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Schedule compaction but do not run them // Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg); scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant pendingCompactionInstant = HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
@@ -118,14 +118,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
// Reload and rollback inflight compaction // Reload and rollback inflight compaction
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
// hoodieTable.rollback(jsc, // hoodieTable.rollback(jsc,
// new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false); // new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false);
client.rollbackInflightCompaction( client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline() pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
.getInstants().findFirst().get(); .getInstants().findFirst().get();
assertEquals("compaction", pendingCompactionInstant.getAction()); assertEquals("compaction", pendingCompactionInstant.getAction());
@@ -163,10 +163,10 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Schedule compaction but do not run them // Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg); scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant pendingCompactionInstant = HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(),
@@ -179,7 +179,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
client.startCommitWithTime(nextInflightInstantTime); client.startCommitWithTime(nextInflightInstantTime);
// Validate // Validate
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get(); inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime, "inflight instant has expected instant time"); assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime, "inflight instant has expected instant time");
assertEquals(1, metaClient.getActiveTimeline() assertEquals(1, metaClient.getActiveTimeline()
@@ -211,7 +211,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
new ArrayList<>()); new ArrayList<>());
// Schedule and mark compaction instant as inflight // Schedule and mark compaction instant as inflight
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg); scheduleCompaction(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
@@ -244,7 +244,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Schedule compaction but do not run them // Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg); scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant pendingCompactionInstant = HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time"); assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time");
@@ -273,10 +273,10 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>()); new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant inflightInstant = HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get(); metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstantTime, inflightInstant.getTimestamp(), "inflight instant has expected instant time"); assertEquals(inflightInstantTime, inflightInstant.getTimestamp(), "inflight instant has expected instant time");
@@ -338,7 +338,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>()); new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false); scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false);
} }
@@ -362,7 +362,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>()); new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg); scheduleCompaction(compactionInstantTime, client, cfg);
@@ -379,7 +379,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
private void validateDeltaCommit(String latestDeltaCommit, private void validateDeltaCommit(String latestDeltaCommit,
final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
HoodieWriteConfig cfg) { HoodieWriteConfig cfg) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable table = getHoodieTable(metaClient, cfg); HoodieTable table = getHoodieTable(metaClient, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table); List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
fileSliceList.forEach(fileSlice -> { fileSliceList.forEach(fileSlice -> {
@@ -400,7 +400,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants) List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
throws Exception { throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions(); List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
List<String> gotPendingCompactionInstants = List<String> gotPendingCompactionInstants =
pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList()); pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
@@ -422,7 +422,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
client.commit(firstInstant, statuses); client.commit(firstInstant, statuses);
} }
assertNoWriteErrors(statusList); assertNoWriteErrors(statusList);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
assertTrue(dataFilesToRead.stream().findAny().isPresent(), assertTrue(dataFilesToRead.stream().findAny().isPresent(),
@@ -433,7 +433,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecords = records.size(); int numRecords = records.size();
for (String instantTime : deltaInstants) { for (String instantTime : deltaInstants) {
records = dataGen.generateUpdates(instantTime, numRecords); records = dataGen.generateUpdates(instantTime, numRecords);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg); validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
} }
@@ -441,7 +441,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
} }
private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) { private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
@@ -452,7 +452,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg)
throws IOException { throws IOException {
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set"); assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
} }
@@ -484,7 +484,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
} }
// verify that there is a commit // verify that there is a commit
table = getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true), cfg); table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertEquals(latestCompactionCommitTime, compactionInstantTime, assertEquals(latestCompactionCommitTime, compactionInstantTime,