[HUDI-1847] Adding inline scheduling support for spark datasource path for compaction and clustering (#4420)
- This adds support in spark-datasource to just schedule table services inline so that users can leverage async execution w/o the need for lock service providers.
This commit is contained in:
committed by
GitHub
parent
b3b44236fe
commit
0ababcfaa7
@@ -1376,6 +1376,40 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testInlineScheduleClustering(boolean scheduleInlineClustering) throws IOException {
|
||||
testInsertTwoBatches(true);
|
||||
|
||||
// setup clustering config.
|
||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(false).withScheduleInlineClustering(scheduleInlineClustering)
|
||||
.withPreserveHoodieCommitMetadata(true).build();
|
||||
|
||||
HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
|
||||
.withClusteringConfig(clusteringConfig)
|
||||
.withProps(getPropertiesForKeyGen()).build();
|
||||
SparkRDDWriteClient client = getHoodieWriteClient(config);
|
||||
dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"});
|
||||
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
|
||||
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
|
||||
client.startCommitWithTime(commitTime1);
|
||||
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
|
||||
JavaRDD<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1);
|
||||
List<WriteStatus> statusList = statuses.collect();
|
||||
assertNoWriteErrors(statusList);
|
||||
client.commit(commitTime1, statuses);
|
||||
|
||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
|
||||
List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
|
||||
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
|
||||
if (scheduleInlineClustering) {
|
||||
assertEquals(1, pendingClusteringPlans.size());
|
||||
} else {
|
||||
assertEquals(0, pendingClusteringPlans.size());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("populateMetaFieldsAndPreserveMetadataParams")
|
||||
public void testClusteringWithSortColumns(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
|
||||
@@ -1529,7 +1563,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> allRecords = testInsertTwoBatches(populateMetaFields);
|
||||
testClustering(clusteringConfig, populateMetaFields, completeClustering, assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
|
||||
return allRecords.getLeft().getLeft();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -132,7 +132,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
|
||||
newCommitTime = "003";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false);
|
||||
}
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
|
||||
@@ -201,7 +201,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
|
||||
newCommitTime = "003";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false);
|
||||
}
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
|
||||
|
||||
@@ -118,7 +118,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
|
||||
String updateTime = "004";
|
||||
client.startCommitWithTime(updateTime);
|
||||
List<HoodieRecord> records004 = dataGen.generateUpdates(updateTime, 100);
|
||||
updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime);
|
||||
updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime, false);
|
||||
|
||||
// verify RO incremental reads - only one base file shows up because updates to into log files
|
||||
incrementalROFiles = getROIncrementalFiles(partitionPath, false);
|
||||
|
||||
@@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
@@ -105,7 +106,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
|
||||
newCommitTime = "004";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false);
|
||||
|
||||
String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString();
|
||||
client.compact(compactionCommitTime);
|
||||
@@ -134,6 +135,48 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testInlineScheduleCompaction(boolean scheduleInlineCompaction) throws Exception {
|
||||
HoodieFileFormat fileFormat = HoodieFileFormat.PARQUET;
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.toString());
|
||||
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
|
||||
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
|
||||
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
|
||||
.build();
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
|
||||
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||
/*
|
||||
* Write 1 (only inserts)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime, true);
|
||||
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
|
||||
|
||||
/*
|
||||
* Write 2 (updates)
|
||||
*/
|
||||
newCommitTime = "004";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, true);
|
||||
|
||||
// verify that there is a commit
|
||||
if (scheduleInlineCompaction) {
|
||||
assertEquals(metaClient.reloadActiveTimeline().getAllCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 1);
|
||||
} else {
|
||||
assertEquals(metaClient.reloadActiveTimeline().getAllCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception {
|
||||
|
||||
@@ -214,12 +214,22 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
|
||||
}
|
||||
|
||||
protected Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
|
||||
SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
|
||||
SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
|
||||
return insertRecordsToMORTable(metaClient, records, client, cfg, commitTime, false);
|
||||
}
|
||||
|
||||
protected Stream<HoodieBaseFile> insertRecordsToMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
|
||||
SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime,
|
||||
boolean doExplicitCommit) throws IOException {
|
||||
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
|
||||
List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect();
|
||||
JavaRDD<WriteStatus> statusesRdd = client.insert(writeRecords, commitTime);
|
||||
List<WriteStatus> statuses = statusesRdd.collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
if (doExplicitCommit) {
|
||||
client.commit(commitTime, statusesRdd);
|
||||
}
|
||||
assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath())));
|
||||
|
||||
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), reloadedMetaClient);
|
||||
@@ -243,6 +253,11 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
|
||||
}
|
||||
|
||||
protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
|
||||
updateRecordsInMORTable(metaClient, records, client, cfg, commitTime, true);
|
||||
}
|
||||
|
||||
protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime,
|
||||
boolean doExplicitCommit) throws IOException {
|
||||
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
Map<HoodieKey, HoodieRecord> recordsMap = new HashMap<>();
|
||||
@@ -252,9 +267,13 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
|
||||
}
|
||||
}
|
||||
|
||||
List<WriteStatus> statuses = client.upsert(jsc().parallelize(records, 1), commitTime).collect();
|
||||
JavaRDD<WriteStatus> statusesRdd = client.upsert(jsc().parallelize(records, 1), commitTime);
|
||||
List<WriteStatus> statuses = statusesRdd.collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
if (doExplicitCommit) {
|
||||
client.commit(commitTime, statusesRdd);
|
||||
}
|
||||
assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath())));
|
||||
|
||||
Option<HoodieInstant> deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
||||
|
||||
Reference in New Issue
Block a user