1
0

[HUDI-1315] Adding builder for HoodieTableMetaClient initialization (#2534)

This commit is contained in:
Sivabalan Narayanan
2021-02-19 20:54:26 -05:00
committed by GitHub
parent 0d91c451b0
commit c9fcf964b2
64 changed files with 241 additions and 203 deletions

View File

@@ -97,7 +97,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
this.hadoopConf = context.getHadoopConf().get();
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient);
this.index = SparkHoodieIndex.createIndex(clientConfig);
this.sqlContextOpt = Option.empty();
@@ -199,7 +199,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
*/
public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(hadoopConf, hoodieTable.getMetaClient().getBasePath(), true);
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build();
return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
.map(
instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue()))

View File

@@ -42,13 +42,10 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
}
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
context.getHadoopConf().get(),
config.getBasePath(),
true,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
}

View File

@@ -135,7 +135,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
int expNumRepairs) throws Exception {
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRepairs, true);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
List<ValidationOpResult> result = client.validateCompactionPlan(metaClient, compactionInstant, 1);
if (expNumRepairs > 0) {
assertTrue(result.stream().anyMatch(r -> !r.isSuccess()), "Expect some failures in validation");
@@ -176,7 +176,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
* @param compactionInstant Compaction Instant
*/
private void ensureValidCompactionPlan(String compactionInstant) throws Exception {
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// Ensure compaction-plan is good to begin with
List<ValidationOpResult> validationResults = client.validateCompactionPlan(metaClient, compactionInstant, 1);
assertFalse(validationResults.stream().anyMatch(v -> !v.isSuccess()),
@@ -234,7 +234,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1, Option.empty(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// Log files belonging to file-slices created because of compaction request must be renamed
@@ -270,7 +270,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
@@ -306,7 +306,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles = client
.getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, Option.empty(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// Log files belonging to file-slices created because of compaction request must be renamed
@@ -331,7 +331,7 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
// Call the main unschedule API
client.unscheduleCompactionFileId(op.getFileGroupId(), false, false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files

View File

@@ -410,7 +410,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
final HoodieWriteConfig cfg = hoodieWriteConfig;
final String instantTime = "007";
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
String basePathStr = basePath;
HoodieTable table = getHoodieTable(metaClient, cfg);
jsc.parallelize(Arrays.asList(1)).map(e -> {
@@ -894,7 +894,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metadata = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTable table = getHoodieTable(metadata, config);
BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
@@ -1001,7 +1001,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
+ readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(),
"file should contain 340 records");
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTable table = getHoodieTable(metaClient, config);
List<HoodieBaseFile> files = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
@@ -1428,7 +1428,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
String instantTime = "000";
@@ -1533,7 +1533,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
String instantTime = "000";
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
@@ -1559,7 +1559,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws Exception {
String instantTime = "000";
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() :

View File

@@ -94,7 +94,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
// Read from hdfs
FileSystem fs = FSUtils.getFs(dfsBasePath, HoodieTestUtils.getDefaultHadoopConf());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), dfsBasePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(dfsBasePath).build();
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, sqlContext, timeline, readCommitTime);
assertEquals(readRecords.count(), records.size(), "Should contain 100 records");
@@ -112,7 +112,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
LOG.info("Reading from path: " + tablePath);
fs = FSUtils.getFs(tablePath, HoodieTestUtils.getDefaultHadoopConf());
metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath);
metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
Dataset<Row> localReadRecords =
HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime);

View File

@@ -118,13 +118,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
// Metadata table should not exist until created for the first time
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
// Metadata table is not created if disabled by config
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
client.startCommitWithTime("001");
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created");
assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
}
// Metadata table created when enabled by config & sync is called
@@ -565,8 +565,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
}
HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, config.getBasePath());
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()).build();
HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline();
// check that there are compactions.
assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants() > 0);
@@ -869,7 +869,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
// Metadata table should be in sync with the dataset
assertTrue(metadata(client).isInSync());
HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");

View File

@@ -88,7 +88,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
**/
protected void validateDeltaCommit(String latestDeltaCommit, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
HoodieWriteConfig cfg) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable table = getHoodieTable(metaClient, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
fileSliceList.forEach(fileSlice -> {
@@ -109,7 +109,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst, List<String> expPendingCompactionInstants)
throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
List<Pair<String, HoodieCompactionPlan>> pendingCompactions = readClient.getPendingCompactions();
List<String> gotPendingCompactionInstants =
pendingCompactions.stream().map(pc -> pc.getKey()).sorted().collect(Collectors.toList());
@@ -131,7 +131,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
client.commit(firstInstant, statuses);
}
assertNoWriteErrors(statusList);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
List<HoodieBaseFile> dataFilesToRead = getCurrentLatestBaseFiles(hoodieTable);
assertTrue(dataFilesToRead.stream().findAny().isPresent(),
@@ -142,7 +142,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
int numRecords = records.size();
for (String instantTime : deltaInstants) {
records = dataGen.generateUpdates(instantTime, numRecords);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
}
@@ -150,7 +150,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
}
protected void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
@@ -160,7 +160,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
protected void scheduleCompaction(String compactionInstantTime, SparkRDDWriteClient client, HoodieWriteConfig cfg) {
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
}
@@ -192,7 +192,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
}
// verify that there is a commit
table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
table = getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(), cfg);
HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertEquals(latestCompactionCommitTime, compactionInstantTime,
@@ -214,7 +214,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
"Compacted files should not show up in latest slices");
// verify that there is a commit
table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, cfg.getBasePath(), true), cfg);
table = getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(), cfg);
HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
// verify compaction commit is visible in timeline
assertTrue(timeline.filterCompletedInstants().getInstants()

View File

@@ -75,7 +75,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
@@ -86,12 +86,12 @@ public class TestAsyncCompaction extends CompactionTestBase {
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
// Reload and rollback inflight compaction
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
.getInstants().findFirst().get();
assertEquals("compaction", pendingCompactionInstant.getAction());
@@ -129,10 +129,10 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(),
@@ -145,7 +145,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
client.startCommitWithTime(nextInflightInstantTime);
// Validate
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
inflightInstant = metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime, "inflight instant has expected instant time");
assertEquals(1, metaClient.getActiveTimeline()
@@ -177,7 +177,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
new ArrayList<>());
// Schedule and mark compaction instant as inflight
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
@@ -210,7 +210,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time");
@@ -239,10 +239,10 @@ public class TestAsyncCompaction extends CompactionTestBase {
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstantTime, inflightInstant.getTimestamp(), "inflight instant has expected instant time");
@@ -304,7 +304,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false);
}
@@ -328,7 +328,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
@@ -356,7 +356,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
metaClient.reloadActiveTimeline();

View File

@@ -57,7 +57,7 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<String> instants = IntStream.range(0, 2).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
// Then: ensure no compaction is executedm since there are only 2 delta commits
assertEquals(2, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
@@ -76,12 +76,12 @@ public class TestInlineCompaction extends CompactionTestBase {
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
// third commit, that will trigger compaction
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
String finalInstant = HoodieActiveTimeline.createNewInstantTime();
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false);
// Then: ensure the file slices are compacted as per policy
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
}
@@ -100,11 +100,11 @@ public class TestInlineCompaction extends CompactionTestBase {
// after 10s, that will trigger compaction
String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, false);
// Then: ensure the file slices are compacted as per policy
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
}
@@ -121,17 +121,17 @@ public class TestInlineCompaction extends CompactionTestBase {
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
// Then: trigger the compaction because reach 3 commits.
String finalInstant = HoodieActiveTimeline.createNewInstantTime();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
// 4th commit, that will trigger compaction because reach the time elapsed
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(6, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
}
}
@@ -145,16 +145,16 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<String> instants = IntStream.range(0, 3).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
// Then: ensure no compaction is executedm since there are only 3 delta commits
assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
// 4th commit, that will trigger compaction
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(5, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
}
}
@@ -183,12 +183,12 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60, CompactionTriggerStrategy.NUM_COMMITS);
String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(instantTime3, dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg, false);
}
// Then: 1 delta commit is done, the failed compaction is retried
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
@@ -218,13 +218,13 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED);
String instantTime2;
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false);
}
// Then: 1 delta commit is done, the failed compaction is retried
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
@@ -255,13 +255,13 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME);
String instantTime2;
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
createNextDeltaCommit(instantTime2, dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, false);
}
// Then: 1 delta commit is done, the failed compaction is retried
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}

View File

@@ -220,7 +220,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
return (commit, numRecords) -> {
final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig);
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), context, table);
return taggedRecords.collect();
@@ -241,7 +241,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
return (numRecords) -> {
final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig);
List<HoodieKey> records = keyGenFunction.apply(numRecords);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
@@ -438,7 +438,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
assertPartitionMetadataForRecords(records, fs);
// verify that there is a commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) {
@@ -506,7 +506,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
assertPartitionMetadataForKeys(keysToDelete, fs);
// verify that there is a commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) {

View File

@@ -346,7 +346,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
}
public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) {
metaClient = new HoodieTableMetaClient(conf, basePath);
metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
return metaClient;
}

View File

@@ -151,7 +151,7 @@ public class HoodieClientTestUtils {
String... paths) {
List<HoodieBaseFile> latestFiles = new ArrayList<>();
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
for (String path : paths) {
BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));

View File

@@ -66,7 +66,7 @@ public class HoodieMergeOnReadTestUtils {
public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema,
String rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(), realtime, jobConf);
Schema schema = HoodieAvroUtils.addMetadataFields(rawSchema);