[HUDI-3612] Clustering strategy should create new TypedProperties when modifying it (#5027)
This commit is contained in:
@@ -233,6 +233,9 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I,
|
|||||||
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
|
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
|
|
||||||
|
// Disable auto commit. Strategy is only expected to write data in new files.
|
||||||
|
config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());
|
||||||
|
|
||||||
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
|
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
|
||||||
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (
|
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (
|
||||||
(ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
|
(ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ import org.apache.log4j.Logger;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clustering Strategy based on following.
|
* Clustering Strategy based on following.
|
||||||
@@ -60,13 +59,12 @@ public class JavaSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
|
|||||||
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
|
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
|
||||||
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
|
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
|
||||||
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
|
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
|
||||||
Properties props = getWriteConfig().getProps();
|
|
||||||
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
|
|
||||||
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
|
|
||||||
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
|
|
||||||
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
|
|
||||||
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
|
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
|
||||||
.withEngineType(EngineType.JAVA).withProps(props).build();
|
.withBulkInsertParallelism(numOutputGroups)
|
||||||
|
.withEngineType(EngineType.JAVA)
|
||||||
|
.withProps(getWriteConfig().getProps()).build();
|
||||||
|
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
|
||||||
return (List<WriteStatus>) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
|
return (List<WriteStatus>) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
|
||||||
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
|
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.table.action.commit;
|
package org.apache.hudi.table.action.commit;
|
||||||
|
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
@@ -111,7 +112,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
|
|||||||
|
|
||||||
FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
|
FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
|
||||||
config.getFileIdPrefixProviderClassName(),
|
config.getFileIdPrefixProviderClassName(),
|
||||||
config.getProps());
|
new TypedProperties(config.getProps()));
|
||||||
|
|
||||||
List<WriteStatus> writeStatuses = new ArrayList<>();
|
List<WriteStatus> writeStatuses = new ArrayList<>();
|
||||||
|
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ import org.apache.log4j.Logger;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This strategy is similar to {@link SparkSortAndSizeExecutionStrategy} with the difference being that
|
* This strategy is similar to {@link SparkSortAndSizeExecutionStrategy} with the difference being that
|
||||||
@@ -67,13 +66,12 @@ public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload<
|
|||||||
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
|
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
|
||||||
}
|
}
|
||||||
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
|
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
|
||||||
Properties props = getWriteConfig().getProps();
|
|
||||||
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
|
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
|
||||||
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
|
.withBulkInsertParallelism(numOutputGroups)
|
||||||
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
|
.withProps(getWriteConfig().getProps()).build();
|
||||||
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
|
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
|
||||||
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(Long.MAX_VALUE));
|
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));
|
||||||
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
|
|
||||||
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
|
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
|
||||||
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
|
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,7 +36,6 @@ import org.apache.log4j.Logger;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clustering Strategy based on following.
|
* Clustering Strategy based on following.
|
||||||
@@ -58,12 +57,11 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
|
|||||||
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
|
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
|
||||||
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
|
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
|
||||||
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
|
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
|
||||||
Properties props = getWriteConfig().getProps();
|
|
||||||
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
|
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
|
||||||
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
|
.withBulkInsertParallelism(numOutputGroups)
|
||||||
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
|
.withProps(getWriteConfig().getProps()).build();
|
||||||
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
|
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
|
||||||
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
|
|
||||||
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
|
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
|
||||||
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
|
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user