[HUDI-2442] Change default values for certin clustering configs (#3875)
This commit is contained in:
@@ -51,13 +51,19 @@ public class HoodieClusteringConfig extends HoodieConfig {
|
|||||||
.sinceVersion("0.7.0")
|
.sinceVersion("0.7.0")
|
||||||
.withDocumentation("Number of partitions to list to create ClusteringPlan");
|
.withDocumentation("Number of partitions to list to create ClusteringPlan");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty
|
||||||
|
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
|
||||||
|
.defaultValue(String.valueOf(600 * 1024 * 1024L))
|
||||||
|
.sinceVersion("0.7.0")
|
||||||
|
.withDocumentation("Files smaller than the size specified here are candidates for clustering");
|
||||||
|
|
||||||
public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = ConfigProperty
|
public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = ConfigProperty
|
||||||
.key("hoodie.clustering.plan.strategy.class")
|
.key("hoodie.clustering.plan.strategy.class")
|
||||||
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy")
|
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy")
|
||||||
.sinceVersion("0.7.0")
|
.sinceVersion("0.7.0")
|
||||||
.withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
|
.withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
|
||||||
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
|
+ "i.e select what file groups are being clustered. Default strategy, looks at the clustering small file size limit (determined by "
|
||||||
+ DAYBASED_LOOKBACK_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
|
+ PLAN_STRATEGY_SMALL_FILE_LIMIT.key() + ") to pick the small file slices within partitions for clustering.");
|
||||||
|
|
||||||
public static final ConfigProperty<String> EXECUTION_STRATEGY_CLASS_NAME = ConfigProperty
|
public static final ConfigProperty<String> EXECUTION_STRATEGY_CLASS_NAME = ConfigProperty
|
||||||
.key("hoodie.clustering.execution.strategy.class")
|
.key("hoodie.clustering.execution.strategy.class")
|
||||||
@@ -91,12 +97,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
|
|||||||
.sinceVersion("0.9.0")
|
.sinceVersion("0.9.0")
|
||||||
.withDocumentation("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");
|
.withDocumentation("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");
|
||||||
|
|
||||||
public static final ConfigProperty<String> PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty
|
|
||||||
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
|
|
||||||
.defaultValue(String.valueOf(600 * 1024 * 1024L))
|
|
||||||
.sinceVersion("0.7.0")
|
|
||||||
.withDocumentation("Files smaller than the size specified here are candidates for clustering");
|
|
||||||
|
|
||||||
public static final ConfigProperty<String> PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
|
public static final ConfigProperty<String> PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
|
||||||
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
|
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
|
||||||
.defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L))
|
.defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L))
|
||||||
|
|||||||
@@ -750,7 +750,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
|
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringSortColumns("_row_key")
|
.withClusteringSortColumns("_row_key").withInlineClustering(true)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
||||||
|
|
||||||
HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
|
HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
|
||||||
|
|||||||
@@ -1356,7 +1356,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
|
public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
|
||||||
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
|
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
|
||||||
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
||||||
}
|
}
|
||||||
@@ -1367,7 +1367,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
|
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
|
||||||
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
|
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
|
||||||
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
||||||
}
|
}
|
||||||
@@ -1391,7 +1391,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception {
|
public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception {
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
|
||||||
|
|
||||||
// start clustering, but don't commit
|
// start clustering, but don't commit
|
||||||
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, populateMetaFields, false);
|
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, populateMetaFields, false);
|
||||||
@@ -1422,7 +1422,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
public void testClusteringWithFailingValidator() throws Exception {
|
public void testClusteringWithFailingValidator() throws Exception {
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringSortColumns("_hoodie_record_key")
|
.withClusteringSortColumns("_hoodie_record_key").withInlineClustering(true)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
||||||
try {
|
try {
|
||||||
testInsertAndClustering(clusteringConfig, true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
testInsertAndClustering(clusteringConfig, true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
||||||
@@ -1436,7 +1436,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
|
public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
|
||||||
try {
|
try {
|
||||||
testInsertAndClustering(clusteringConfig, false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
|
testInsertAndClustering(clusteringConfig, false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
|
||||||
fail("expected pre-commit clustering validation to fail because sql query is not configured");
|
fail("expected pre-commit clustering validation to fail because sql query is not configured");
|
||||||
@@ -1449,7 +1449,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
|
public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
|
||||||
|
|
||||||
testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(),
|
testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(),
|
||||||
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
|
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
|
||||||
@@ -1459,7 +1459,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
|
public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(),
|
testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(),
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
|
|||||||
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
|
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
|
||||||
.withClusteringMaxNumGroups(10)
|
.withClusteringMaxNumGroups(10)
|
||||||
.withClusteringTargetPartitions(0)
|
.withClusteringTargetPartitions(0)
|
||||||
|
.withInlineClustering(true)
|
||||||
.withInlineClusteringNumCommits(1)
|
.withInlineClusteringNumCommits(1)
|
||||||
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
|
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
|
||||||
.withRollbackUsingMarkers(false);
|
.withRollbackUsingMarkers(false);
|
||||||
|
|||||||
@@ -706,13 +706,7 @@ class TestMORDataSource extends HoodieClientTestBase {
|
|||||||
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
||||||
.option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
.option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||||
// option for clustering
|
// option for clustering
|
||||||
.option("hoodie.parquet.small.file.limit", "0")
|
|
||||||
.option("hoodie.clustering.inline", "true")
|
.option("hoodie.clustering.inline", "true")
|
||||||
.option("hoodie.clustering.inline.max.commits", "1")
|
|
||||||
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
|
|
||||||
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
|
|
||||||
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
|
|
||||||
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(12 *1024 * 1024L))
|
|
||||||
.option("hoodie.clustering.plan.strategy.sort.columns", "begin_lat, begin_lon")
|
.option("hoodie.clustering.plan.strategy.sort.columns", "begin_lat, begin_lon")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|||||||
Reference in New Issue
Block a user