[HUDI-3548] Fix if user specify key "hoodie.datasource.clustering.async.enable" directly, async clustering not work (#4905)
Co-authored-by: Rex An <bonean131@gmail.com>
This commit is contained in:
@@ -32,7 +32,6 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.TablePathUtils;
|
||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodiePayloadConfig;
|
||||
@@ -177,8 +176,6 @@ public class DataSourceUtils {
|
||||
boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
|
||||
boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
|
||||
.equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
|
||||
boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()));
|
||||
boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE().key()));
|
||||
// insert/bulk-insert combining to be true, if filtering for duplicates
|
||||
boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key()));
|
||||
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
|
||||
@@ -191,9 +188,6 @@ public class DataSourceUtils {
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
|
||||
.withInlineCompaction(inlineCompact).build())
|
||||
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
|
||||
.withInlineClustering(inlineClusteringEnabled)
|
||||
.withAsyncClustering(asyncClusteringEnabled).build())
|
||||
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
|
||||
.build())
|
||||
// override above with Hoodie configs specified as options.
|
||||
|
||||
@@ -732,8 +732,7 @@ object HoodieSparkSqlWriter {
|
||||
private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
|
||||
parameters: Map[String, String]): Boolean = {
|
||||
log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
|
||||
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled &&
|
||||
parameters.get(ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean)
|
||||
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled
|
||||
}
|
||||
|
||||
private def getHoodieTableConfig(sparkContext: SparkContext,
|
||||
|
||||
@@ -25,6 +25,8 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
|
||||
@@ -222,6 +224,30 @@ public class TestDataSourceUtils {
|
||||
assertThat(partitioner.isPresent(), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateHoodieConfigWithAsyncClustering() {
|
||||
ArrayList<ImmutablePair<String, Boolean>> asyncClusteringKeyValues = new ArrayList<>(4);
|
||||
asyncClusteringKeyValues.add(new ImmutablePair(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), true));
|
||||
asyncClusteringKeyValues.add(new ImmutablePair(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), true));
|
||||
asyncClusteringKeyValues.add(new ImmutablePair("hoodie.datasource.clustering.async.enable", true));
|
||||
asyncClusteringKeyValues.add(new ImmutablePair("hoodie.clustering.async.enabled", true));
|
||||
|
||||
asyncClusteringKeyValues.stream().forEach(pair -> {
|
||||
HashMap<String, String> params = new HashMap<>(3);
|
||||
params.put(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.TABLE_TYPE().defaultValue());
|
||||
params.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
|
||||
DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue());
|
||||
params.put(pair.left, pair.right.toString());
|
||||
HoodieWriteConfig hoodieConfig = DataSourceUtils
|
||||
.createHoodieConfig(avroSchemaString, config.getBasePath(), "test", params);
|
||||
assertEquals(pair.right, hoodieConfig.isAsyncClusteringEnabled());
|
||||
|
||||
TypedProperties prop = new TypedProperties();
|
||||
prop.putAll(params);
|
||||
assertEquals(pair.right, HoodieClusteringConfig.from(prop).isAsyncClusteringEnabled());
|
||||
});
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testBuildHiveSyncConfig(boolean useSyncMode) {
|
||||
|
||||
Reference in New Issue
Block a user