[HUDI-2207] Support independent flink hudi clustering function
This commit is contained in:
@@ -1379,7 +1379,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
||||
return scheduleClustering(extraMetadata);
|
||||
}
|
||||
|
||||
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
|
||||
public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
|
||||
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
|
||||
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
|
||||
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
||||
|
||||
@@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
|
||||
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
|
||||
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
|
||||
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
|
||||
public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
|
||||
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
|
||||
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
|
||||
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
|
||||
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
|
||||
|
||||
@@ -25,20 +25,20 @@ import org.apache.hudi.io.WriteHandleFactory;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
|
||||
* Output spark partition will have records from only one hoodie partition. - Average records per output spark
|
||||
* partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
|
||||
* Repartition input records into at least expected number of output partitions. It should give below guarantees -
|
||||
* Output partition will have records from only one hoodie partition. - Average records per output
|
||||
* partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
|
||||
*/
|
||||
public interface BulkInsertPartitioner<I> extends Serializable {
|
||||
|
||||
/**
|
||||
* Repartitions the input records into at least expected number of output spark partitions.
|
||||
* Repartitions the input records into at least expected number of output partitions.
|
||||
*
|
||||
* @param records Input Hoodie records
|
||||
* @param outputSparkPartitions Expected number of output partitions
|
||||
* @param records Input Hoodie records
|
||||
* @param outputPartitions Expected number of output partitions
|
||||
* @return
|
||||
*/
|
||||
I repartitionRecords(I records, int outputSparkPartitions);
|
||||
I repartitionRecords(I records, int outputPartitions);
|
||||
|
||||
/**
|
||||
* @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
|
||||
@@ -48,6 +48,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
|
||||
/**
|
||||
* Return file group id prefix for the given data partition.
|
||||
* By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
|
||||
*
|
||||
* @param partitionId data partition
|
||||
* @return
|
||||
*/
|
||||
@@ -57,6 +58,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
|
||||
|
||||
/**
|
||||
* Return write handle factory for the given partition.
|
||||
*
|
||||
* @param partitionId data partition
|
||||
* @return
|
||||
*/
|
||||
|
||||
@@ -70,6 +70,9 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
|
||||
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
|
||||
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
|
||||
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
|
||||
String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
|
||||
String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
|
||||
String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
|
||||
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
|
||||
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
|
||||
|
||||
@@ -82,6 +85,14 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
|
||||
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
|
||||
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
|
||||
return sparkSizeBasedClassName;
|
||||
} else if (flinkRecentDaysClassName.equals(className)) {
|
||||
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
|
||||
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
|
||||
return flinkSizeBasedClassName;
|
||||
} else if (flinkSelectedPartitionsClassName.equals(className)) {
|
||||
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
|
||||
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
|
||||
return flinkSizeBasedClassName;
|
||||
} else if (javaSelectedPartitionClassName.equals(className)) {
|
||||
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
|
||||
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
|
||||
|
||||
@@ -29,8 +29,10 @@ import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.TableServiceType;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||
@@ -39,6 +41,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieClusteringException;
|
||||
import org.apache.hudi.exception.HoodieCommitException;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
import org.apache.hudi.index.FlinkHoodieIndexFactory;
|
||||
@@ -68,6 +71,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
@@ -399,6 +404,52 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
throw new HoodieNotSupportedException("Clustering is not supported yet");
|
||||
}
|
||||
|
||||
private void completeClustering(
|
||||
HoodieReplaceCommitMetadata metadata,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
|
||||
String clusteringCommitTime) {
|
||||
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
|
||||
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
|
||||
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
|
||||
e.getValue().stream()).collect(Collectors.toList());
|
||||
if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
|
||||
throw new HoodieClusteringException("Clustering failed to write to files:"
|
||||
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
|
||||
}
|
||||
|
||||
try {
|
||||
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
|
||||
finalizeWrite(table, clusteringCommitTime, writeStats);
|
||||
// commit to data table after committing to metadata table.
|
||||
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
||||
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
||||
writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
|
||||
LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
|
||||
table.getActiveTimeline().transitionReplaceInflightToComplete(
|
||||
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
|
||||
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieClusteringException(
|
||||
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
|
||||
} finally {
|
||||
this.txnManager.endTransaction(Option.of(clusteringInstant));
|
||||
}
|
||||
|
||||
WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
|
||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||
if (clusteringTimer != null) {
|
||||
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
|
||||
try {
|
||||
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
|
||||
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
|
||||
} catch (ParseException e) {
|
||||
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
|
||||
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
|
||||
}
|
||||
}
|
||||
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
@@ -412,6 +463,23 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
||||
// no need to execute the upgrade/downgrade on each write in streaming.
|
||||
}
|
||||
|
||||
public void completeTableService(
|
||||
TableServiceType tableServiceType,
|
||||
HoodieCommitMetadata metadata,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
|
||||
String commitInstant) {
|
||||
switch (tableServiceType) {
|
||||
case CLUSTER:
|
||||
completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
|
||||
break;
|
||||
case COMPACT:
|
||||
completeCompaction(metadata, table, commitInstant);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upgrade downgrade the Hoodie table.
|
||||
*
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client.clustering.plan.strategy;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
|
||||
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Clustering Strategy based on following.
|
||||
* 1) Only looks at latest 'daybased.lookback.partitions' partitions.
|
||||
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
|
||||
*/
|
||||
public class FlinkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
|
||||
extends FlinkSizeBasedClusteringPlanStrategy<T> {
|
||||
private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);
|
||||
|
||||
public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
|
||||
HoodieFlinkEngineContext engineContext,
|
||||
HoodieWriteConfig writeConfig) {
|
||||
super(table, engineContext, writeConfig);
|
||||
}
|
||||
|
||||
public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
|
||||
HoodieFlinkEngineContext engineContext,
|
||||
HoodieWriteConfig writeConfig) {
|
||||
super(table, engineContext, writeConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
|
||||
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
|
||||
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
|
||||
return partitionPaths.stream()
|
||||
.sorted(Comparator.reverseOrder())
|
||||
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
|
||||
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client.clustering.plan.strategy;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
|
||||
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
|
||||
|
||||
/**
|
||||
* Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive.
|
||||
*/
|
||||
public class FlinkSelectedPartitionsClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
|
||||
extends FlinkSizeBasedClusteringPlanStrategy<T> {
|
||||
private static final Logger LOG = LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class);
|
||||
|
||||
public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
|
||||
public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
|
||||
|
||||
public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
|
||||
HoodieFlinkEngineContext engineContext,
|
||||
HoodieWriteConfig writeConfig) {
|
||||
super(table, engineContext, writeConfig);
|
||||
}
|
||||
|
||||
public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
|
||||
HoodieFlinkEngineContext engineContext,
|
||||
HoodieWriteConfig writeConfig) {
|
||||
super(table, engineContext, writeConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
|
||||
String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
|
||||
String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
|
||||
List<String> filteredPartitions = partitionPaths.stream()
|
||||
.filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
|
||||
.collect(Collectors.toList());
|
||||
LOG.info("Filtered to the following partitions: " + filteredPartitions);
|
||||
return filteredPartitions;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client.clustering.plan.strategy;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieClusteringGroup;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
|
||||
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
|
||||
import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
|
||||
|
||||
/**
|
||||
* Clustering Strategy based on following.
|
||||
* 1) Creates clustering groups based on max size allowed per group.
|
||||
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
|
||||
*/
|
||||
public class FlinkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
|
||||
extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
|
||||
private static final Logger LOG = LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
|
||||
|
||||
public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
|
||||
HoodieFlinkEngineContext engineContext,
|
||||
HoodieWriteConfig writeConfig) {
|
||||
super(table, engineContext, writeConfig);
|
||||
}
|
||||
|
||||
public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
|
||||
HoodieFlinkEngineContext engineContext,
|
||||
HoodieWriteConfig writeConfig) {
|
||||
super(table, engineContext, writeConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
|
||||
HoodieWriteConfig writeConfig = getWriteConfig();
|
||||
|
||||
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
|
||||
List<FileSlice> currentGroup = new ArrayList<>();
|
||||
long totalSizeSoFar = 0;
|
||||
|
||||
for (FileSlice currentSlice : fileSlices) {
|
||||
// check if max size is reached and create new group, if needed.
|
||||
// in now, every clustering group out put is 1 file group.
|
||||
if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() && !currentGroup.isEmpty()) {
|
||||
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
|
||||
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size());
|
||||
fileSliceGroups.add(Pair.of(currentGroup, 1));
|
||||
currentGroup = new ArrayList<>();
|
||||
totalSizeSoFar = 0;
|
||||
}
|
||||
|
||||
// Add to the current file-group
|
||||
currentGroup.add(currentSlice);
|
||||
// assume each file group size is ~= parquet.max.file.size
|
||||
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
|
||||
}
|
||||
|
||||
if (!currentGroup.isEmpty()) {
|
||||
fileSliceGroups.add(Pair.of(currentGroup, 1));
|
||||
}
|
||||
|
||||
return fileSliceGroups.stream().map(fileSliceGroup ->
|
||||
HoodieClusteringGroup.newBuilder()
|
||||
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
|
||||
.setNumOutputFileGroups(fileSliceGroup.getRight())
|
||||
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> getStrategyParams() {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
|
||||
params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), getWriteConfig().getClusteringSortColumns());
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
|
||||
return partitionPaths;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
|
||||
return super.getFileSlicesEligibleForClustering(partition)
|
||||
// Only files that have basefile size smaller than small file size are eligible.
|
||||
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
|
||||
}
|
||||
|
||||
private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
|
||||
return (int) Math.ceil(groupSize / (double) targetFileSize);
|
||||
}
|
||||
}
|
||||
@@ -55,6 +55,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
|
||||
import org.apache.hudi.table.action.clean.CleanActionExecutor;
|
||||
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
|
||||
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
|
||||
@@ -286,7 +287,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
|
||||
|
||||
@Override
|
||||
public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) {
|
||||
throw new HoodieNotSupportedException("Clustering is not supported on a Flink CopyOnWrite table");
|
||||
return new ClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -49,7 +49,7 @@ public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
|
||||
|
||||
@Override
|
||||
public List<HoodieRecord<T>> repartitionRecords(
|
||||
List<HoodieRecord<T>> records, int outputSparkPartitions) {
|
||||
List<HoodieRecord<T>> records, int outputPartitions) {
|
||||
return records.stream().sorted((o1, o2) -> {
|
||||
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled);
|
||||
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled);
|
||||
|
||||
@@ -37,7 +37,7 @@ public class JavaGlobalSortPartitioner<T extends HoodieRecordPayload>
|
||||
|
||||
@Override
|
||||
public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
|
||||
int outputSparkPartitions) {
|
||||
int outputPartitions) {
|
||||
// Now, sort the records and line them up nicely for loading.
|
||||
records.sort(new Comparator() {
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user