diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index bad6c2d72..ead806bca 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.TablePathUtils; -import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; @@ -177,8 +176,6 @@ public class DataSourceUtils { boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key())); boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key()) .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); - boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key())); - boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE().key())); // insert/bulk-insert combining to be true, if filtering for duplicates boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key())); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() @@ -191,9 +188,6 @@ public class DataSourceUtils { .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key())) .withInlineCompaction(inlineCompact).build()) - .withClusteringConfig(HoodieClusteringConfig.newBuilder() - .withInlineClustering(inlineClusteringEnabled) - .withAsyncClustering(asyncClusteringEnabled).build()) .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key())) .build()) // override above with Hoodie configs specified as options. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6b6ddc38e..022eef5ff 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -732,8 +732,7 @@ object HoodieSparkSqlWriter { private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], parameters: Map[String, String]): Boolean = { log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}") - asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled && - parameters.get(ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean) + asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled } private def getHoodieTableConfig(sparkContext: SparkContext, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index bf3520f09..af5bbe771 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -25,6 +25,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; @@ -222,6 +224,30 @@ public class TestDataSourceUtils { assertThat(partitioner.isPresent(), is(true)); } + @Test + public void testCreateHoodieConfigWithAsyncClustering() { + ArrayList> asyncClusteringKeyValues = new ArrayList<>(4); + asyncClusteringKeyValues.add(new ImmutablePair(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), true)); + asyncClusteringKeyValues.add(new ImmutablePair(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), true)); + asyncClusteringKeyValues.add(new ImmutablePair("hoodie.datasource.clustering.async.enable", true)); + asyncClusteringKeyValues.add(new ImmutablePair("hoodie.clustering.async.enabled", true)); + + asyncClusteringKeyValues.stream().forEach(pair -> { + HashMap params = new HashMap<>(3); + params.put(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.TABLE_TYPE().defaultValue()); + params.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), + DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue()); + params.put(pair.left, pair.right.toString()); + HoodieWriteConfig hoodieConfig = DataSourceUtils + .createHoodieConfig(avroSchemaString, config.getBasePath(), "test", params); + assertEquals(pair.right, hoodieConfig.isAsyncClusteringEnabled()); + + TypedProperties prop = new TypedProperties(); + prop.putAll(params); + assertEquals(pair.right, HoodieClusteringConfig.from(prop).isAsyncClusteringEnabled()); + }); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testBuildHiveSyncConfig(boolean useSyncMode) {