diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java index 89f109aa9..c8f9fdd0c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java @@ -249,7 +249,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { return (commit, numRecords) -> { final HoodieIndex index = HoodieIndex.createIndex(writeConfig); List records = recordGenFunction.apply(commit, numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); + final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true); HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf); JavaRDD taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); return taggedRecords.collect(); @@ -270,7 +270,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { return (numRecords) -> { final HoodieIndex index = HoodieIndex.createIndex(writeConfig); List records = keyGenFunction.apply(numRecords); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); + final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true); HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf); JavaRDD recordsToDelete = jsc.parallelize(records, 1) .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); @@ -467,7 +467,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { assertPartitionMetadataForRecords(records, fs); // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); if (assertForCommit) { @@ -535,7 +535,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { assertPartitionMetadataForKeys(keysToDelete, fs); // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); if (assertForCommit) { diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 246ba37e5..223c39db4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -478,7 +478,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, - readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath())) + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); // Update + Inserts such that they just expand file1 @@ -498,10 +498,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(140, readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), + assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(), "file should contain 140 records"); - List records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); + List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertEquals(commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect commit2"); @@ -521,7 +521,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = getHoodieTable(metadata, config); BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView(); @@ -532,7 +532,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { for (HoodieBaseFile file : files) { if (file.getFileName().contains(file1)) { assertEquals(commitTime3, file.getCommitTime(), "Existing file should be expanded"); - records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); + records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); @@ -548,7 +548,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertEquals(0, keys2.size(), "All keys added in commit 2 must be updated in commit3 correctly"); } else { assertEquals(commitTime3, file.getCommitTime(), "New file must be written for commit 3"); - records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); + records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertEquals(commitTime3, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), @@ -589,7 +589,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, - readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath())) + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); // Second, set of Inserts should just expand file1 @@ -605,10 +605,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(140, readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), + assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(), "file should contain 140 records"); - List records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); + List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); @@ -627,7 +627,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = getHoodieTable(metaClient, config); List files = table.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList()); @@ -636,7 +636,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { int totalInserts = 0; for (HoodieBaseFile file : files) { assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); - records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath())); + records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); totalInserts += records.size(); } assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(), @@ -670,7 +670,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, - readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath())) + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); // Delete 20 among 100 inserted @@ -763,10 +763,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); assertEquals(exepctedRecords, - readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), + readRowKeysFromParquet(hadoopConf, newFile).size(), "file should contain 110 records"); - List records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); + List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime); @@ -808,7 +808,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); String instantTime = "000"; @@ -855,7 +855,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteClient client = getHoodieWriteClient(cfg); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); String instantTime = "000"; client.startCommitWithTime(instantTime); @@ -926,7 +926,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { */ @Test public void testConsistencyCheckDuringFinalize() throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); String instantTime = "000"; HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteClient client = getHoodieWriteClient(cfg); @@ -944,7 +944,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { @Test public void testRollbackAfterConsistencyCheckFailure() throws Exception { String instantTime = "000"; - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteClient client = getHoodieWriteClient(cfg); testConsistencyCheck(metaClient, instantTime); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 5c470cb69..54e5e8b0a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -81,7 +81,7 @@ public class TestMultiFS extends HoodieClientTestHarness { @Test public void readLocalWriteHDFS() throws Exception { // Initialize table and filesystem - HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.valueOf(tableType), + HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName()); // Create write client to write some records in @@ -106,7 +106,7 @@ public class TestMultiFS extends HoodieClientTestHarness { assertEquals(readRecords.count(), records.size(), "Should contain 100 records"); // Write to local - HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), + HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName()); String writeCommitTime = localWriteClient.startCommit(); diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java index 62d5505d0..2a24c210a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java @@ -144,7 +144,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im throw new IllegalStateException("The Spark context has not been initialized."); } - initFileSystemWithConfiguration(jsc.hadoopConfiguration()); + initFileSystemWithConfiguration(hadoopConf); } /** @@ -181,7 +181,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im throw new IllegalStateException("The Spark context has not been initialized."); } - metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType()); } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java index 42a0b970f..d1122cff5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java @@ -112,7 +112,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { public void setUp() throws Exception { // Initialize a local spark env initSparkContexts("TestHbaseIndex"); - jsc.hadoopConfiguration().addResource(utility.getConfiguration()); + hadoopConf.addResource(utility.getConfiguration()); // Create a temp folder as the base path initPath(); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index a6f69b612..8b938287b 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -259,7 +259,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { List uuids = Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey()); - List results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(jsc.hadoopConfiguration(), uuids, + List results = HoodieKeyLookupHandle.checkCandidatesAgainstFile(hadoopConf, uuids, new Path(basePath + "/2016/01/31/" + filename)); assertEquals(results.size(), 2); assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0") diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index d56f29781..bfd594670 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -62,7 +62,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { initPath(); initSparkContexts("TestHoodieCommitArchiveLog"); hadoopConf = dfs.getConf(); - jsc.hadoopConfiguration().addResource(dfs.getConf()); + hadoopConf.addResource(dfs.getConf()); dfs.mkdirs(new Path(basePath)); metaClient = HoodieTestUtils.init(hadoopConf, basePath); } diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 601f64afe..29adf5efb 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -85,7 +85,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { // Build a write config with bulkinsertparallelism set HoodieWriteConfig cfg = getConfigBuilder().build(); try (HoodieWriteClient client = getWriteClient(cfg);) { - FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); + FileSystem fs = FSUtils.getFs(basePath, hadoopConf); /** * Write 1 (only inserts) This will do a bulk insert of 44 records of which there are 2 records repeated 21 times diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 2efdeb5e9..844551846 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -567,7 +567,7 @@ public class TestCleaner extends TestHoodieClientBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); // Make 3 files, one base file and 2 log files associated with base file String file1P0 = @@ -1010,7 +1010,7 @@ public class TestCleaner extends TestHoodieClientBase { private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDeleted, int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws IOException { HoodieTableMetaClient metaClient = - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); String[] instants = new String[] {"000", "001", "003", "005", "007", "009", "011", "013"}; String[] compactionInstants = new String[] {"002", "004", "006", "008", "010"}; Map expFileIdToPendingCompaction = new HashMap<>(); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 57f5edbf8..bcdffba87 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -100,19 +100,19 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void init() throws IOException { initDFS(); initSparkContexts("TestHoodieMergeOnReadTable"); - jsc.hadoopConfiguration().addResource(dfs.getConf()); + hadoopConf.addResource(dfs.getConf()); initPath(); dfs.mkdirs(new Path(basePath)); - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); initTestDataGenerator(); // initialize parquet input format roInputFormat = new HoodieParquetInputFormat(); - roJobConf = new JobConf(jsc.hadoopConfiguration()); + roJobConf = new JobConf(hadoopConf); roInputFormat.setConf(roJobConf); rtInputFormat = new HoodieParquetRealtimeInputFormat(); - rtJobConf = new JobConf(jsc.hadoopConfiguration()); + rtJobConf = new JobConf(hadoopConf); rtInputFormat.setConf(rtJobConf); } @@ -307,7 +307,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -375,7 +375,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void testCOWToMORConvertedTableRollback() throws Exception { // Set TableType to COW - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); HoodieWriteConfig cfg = getConfig(true); try (HoodieWriteClient client = getWriteClient(cfg);) { @@ -393,7 +393,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // verify there are no errors assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertTrue(commit.isPresent()); assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); @@ -411,7 +411,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); // Set TableType to MOR - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); // rollback a COW commit when TableType is MOR client.rollback(newCommitTime); @@ -448,7 +448,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -595,7 +595,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -758,7 +758,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -832,7 +832,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); newCommitTime = "101"; writeClient.startCommitWithTime(newCommitTime); @@ -907,7 +907,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.commit(newCommitTime, statuses); HoodieTable table = - HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); + HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -968,7 +968,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs // and calling rollback twice final String lastCommitTime = newCommitTime; - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieInstant last = metaClient.getCommitsTimeline().getInstants() .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get(); String fileName = last.getFileName(); @@ -1017,7 +1017,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { statuses.collect(); HoodieTable table = - HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); + HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -1038,7 +1038,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); // Trigger a rollback of compaction writeClient.rollback(newCommitTime); - table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); + table = HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); tableRTFileSystemView = table.getSliceView(); ((SyncableFileSystemView) tableRTFileSystemView).reset(); Option lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant(); @@ -1058,7 +1058,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); // Create a commit without rolling stats in metadata to test backwards compatibility @@ -1157,7 +1157,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { public void testRollingStatsWithSmallFileHandling() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); Map fileIdToInsertsMap = new HashMap<>(); Map fileIdToUpsertsMap = new HashMap<>(); @@ -1304,7 +1304,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -1397,7 +1397,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.insert(writeRecords, commitTime).collect(); assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); @@ -1450,7 +1450,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { private FileStatus[] getROIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull, boolean stopAtCompaction) throws Exception { - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); setupIncremental(roJobConf, startCommitTime, numCommitsToPull, stopAtCompaction); FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath, partitionPath).toString()); return roInputFormat.listStatus(roJobConf); @@ -1463,7 +1463,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { private FileStatus[] getRTIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) throws Exception { - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false); FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath, partitionPath).toString()); return rtInputFormat.listStatus(rtJobConf); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 54fb555bb..d39df241d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -163,13 +163,13 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { // Read out the bloom filter and make sure filter can answer record exist or not Path parquetFilePath = allFiles[0].getPath(); - BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), parquetFilePath); + BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath); for (HoodieRecord record : records) { assertTrue(filter.mightContain(record.getRecordKey())); } // Read the parquet file, check the record content - List fileRecords = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), parquetFilePath); + List fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath); GenericRecord newRecord; int index = 0; for (GenericRecord record : fileRecords) { @@ -205,7 +205,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { // Check whether the record has been updated Path updatedParquetFilePath = allFiles[0].getPath(); BloomFilter updatedFilter = - ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(), updatedParquetFilePath); + ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath); for (HoodieRecord record : records) { // No change to the _row_key assertTrue(updatedFilter.mightContain(record.getRecordKey())); @@ -234,9 +234,9 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { throws Exception { // initialize parquet input format HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat(); - JobConf jobConf = new JobConf(jsc.hadoopConfiguration()); + JobConf jobConf = new JobConf(hadoopConf); hoodieInputFormat.setConf(jobConf); - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); setupIncremental(jobConf, startCommitTime, numCommitsToPull); FileInputFormat.setInputPaths(jobConf, Paths.get(basePath, partitionPath).toString()); return hoodieInputFormat.listStatus(jobConf); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index f348f0c92..88fdae80c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -107,7 +107,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Schedule compaction but do not run them scheduleCompaction(compactionInstantTime, client, cfg); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); @@ -118,14 +118,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase { moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); // Reload and rollback inflight compaction - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); // hoodieTable.rollback(jsc, // new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false); client.rollbackInflightCompaction( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline() .getInstants().findFirst().get(); assertEquals("compaction", pendingCompactionInstant.getAction()); @@ -163,10 +163,10 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Schedule compaction but do not run them scheduleCompaction(compactionInstantTime, client, cfg); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), @@ -179,7 +179,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { client.startCommitWithTime(nextInflightInstantTime); // Validate - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get(); assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime, "inflight instant has expected instant time"); assertEquals(1, metaClient.getActiveTimeline() @@ -211,7 +211,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { new ArrayList<>()); // Schedule and mark compaction instant as inflight - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); moveCompactionFromRequestedToInflight(compactionInstantTime, cfg); @@ -244,7 +244,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Schedule compaction but do not run them scheduleCompaction(compactionInstantTime, client, cfg); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieInstant pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time"); @@ -273,10 +273,10 @@ public class TestAsyncCompaction extends TestHoodieClientBase { records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieInstant inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get(); assertEquals(inflightInstantTime, inflightInstant.getTimestamp(), "inflight instant has expected instant time"); @@ -338,7 +338,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false); } @@ -362,7 +362,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); scheduleCompaction(compactionInstantTime, client, cfg); @@ -379,7 +379,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { private void validateDeltaCommit(String latestDeltaCommit, final Map> fgIdToCompactionOperation, HoodieWriteConfig cfg) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable table = getHoodieTable(metaClient, cfg); List fileSliceList = getCurrentLatestFileSlices(table); fileSliceList.forEach(fileSlice -> { @@ -400,7 +400,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { List records, HoodieWriteConfig cfg, boolean insertFirst, List expPendingCompactionInstants) throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); List> pendingCompactions = readClient.getPendingCompactions(); List gotPendingCompactionInstants = pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList()); @@ -422,7 +422,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { client.commit(firstInstant, statuses); } assertNoWriteErrors(statusList); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); List dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); assertTrue(dataFilesToRead.stream().findAny().isPresent(), @@ -433,7 +433,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecords = records.size(); for (String instantTime : deltaInstants) { records = dataGen.generateUpdates(instantTime, numRecords); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg); } @@ -441,7 +441,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { } private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() @@ -452,7 +452,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) throws IOException { client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set"); } @@ -484,7 +484,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { } // verify that there is a commit - table = getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true), cfg); + table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg); HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); assertEquals(latestCompactionCommitTime, compactionInstantTime,