From 187bedf79531e089eb40acf03ca72c2d54400b0e Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 10 Nov 2021 14:23:24 +0530 Subject: [PATCH] [HUDI-2442] Change default values for certin clustering configs (#3875) --- .../hudi/config/HoodieClusteringConfig.java | 18 +++++++++--------- .../functional/TestHoodieBackedMetadata.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 14 +++++++------- ...tHoodieSparkMergeOnReadTableClustering.java | 1 + .../hudi/functional/TestMORDataSource.scala | 6 ------ 5 files changed, 18 insertions(+), 23 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 5fcd9dfd6..73c66d803 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -51,13 +51,19 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Number of partitions to list to create ClusteringPlan"); + public static final ConfigProperty PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty + .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit") + .defaultValue(String.valueOf(600 * 1024 * 1024L)) + .sinceVersion("0.7.0") + .withDocumentation("Files smaller than the size specified here are candidates for clustering"); + public static final ConfigProperty PLAN_STRATEGY_CLASS_NAME = ConfigProperty .key("hoodie.clustering.plan.strategy.class") - .defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy") + .defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy") .sinceVersion("0.7.0") .withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan " - + "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by " - + DAYBASED_LOOKBACK_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions."); + + "i.e select what file groups are being clustered. Default strategy, looks at the clustering small file size limit (determined by " + + PLAN_STRATEGY_SMALL_FILE_LIMIT.key() + ") to pick the small file slices within partitions for clustering."); public static final ConfigProperty EXECUTION_STRATEGY_CLASS_NAME = ConfigProperty .key("hoodie.clustering.execution.strategy.class") @@ -91,12 +97,6 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.9.0") .withDocumentation("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan"); - public static final ConfigProperty PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigProperty - .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit") - .defaultValue(String.valueOf(600 * 1024 * 1024L)) - .sinceVersion("0.7.0") - .withDocumentation("Files smaller than the size specified here are candidates for clustering"); - public static final ConfigProperty PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group") .defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L)) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f2aa2462b..9ece523ff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -750,7 +750,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringSortColumns("_row_key") + .withClusteringSortColumns("_row_key").withInlineClustering(true) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 66fb1cd04..caffb476b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1356,7 +1356,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true) .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @@ -1367,7 +1367,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key") - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true) .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @@ -1391,7 +1391,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(); // start clustering, but don't commit List allRecords = testInsertAndClustering(clusteringConfig, populateMetaFields, false); @@ -1422,7 +1422,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testClusteringWithFailingValidator() throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringSortColumns("_hoodie_record_key") + .withClusteringSortColumns("_hoodie_record_key").withInlineClustering(true) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); try { testInsertAndClustering(clusteringConfig, true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); @@ -1436,7 +1436,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(); try { testInsertAndClustering(clusteringConfig, false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", ""); fail("expected pre-commit clustering validation to fail because sql query is not configured"); @@ -1449,7 +1449,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(); testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", COUNT_SQL_QUERY_FOR_VALIDATION + "#400"); @@ -1459,7 +1459,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build(); try { testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java index a22a04075..8f7a500d1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -93,6 +93,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes .withClusteringConfig(HoodieClusteringConfig.newBuilder() .withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0) + .withInlineClustering(true) .withInlineClusteringNumCommits(1) .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build()) .withRollbackUsingMarkers(false); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index eba2a3d24..5719ad7c1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -706,13 +706,7 @@ class TestMORDataSource extends HoodieClientTestBase { .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) // option for clustering - .option("hoodie.parquet.small.file.limit", "0") .option("hoodie.clustering.inline", "true") - .option("hoodie.clustering.inline.max.commits", "1") - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") - .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") - .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(12 *1024 * 1024L)) .option("hoodie.clustering.plan.strategy.sort.columns", "begin_lat, begin_lon") .mode(SaveMode.Overwrite) .save(basePath)