1
0

Merge pull request #3599 from yuzhaojing/HUDI-2207

[HUDI-2207] Support independent flink hudi clustering function
This commit is contained in:
Zhaojing Yu
2022-05-25 00:47:28 +08:00
committed by GitHub
23 changed files with 1700 additions and 14 deletions

View File

@@ -1379,7 +1379,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
return scheduleClustering(extraMetadata); return scheduleClustering(extraMetadata);
} }
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false); Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());

View File

@@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY = public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy"; "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY = public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy"; "org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY = public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =

View File

@@ -25,20 +25,20 @@ import org.apache.hudi.io.WriteHandleFactory;
import java.io.Serializable; import java.io.Serializable;
/** /**
* Repartition input records into at least expected number of output spark partitions. It should give below guarantees - * Repartition input records into at least expected number of output partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. - Average records per output spark * Output partition will have records from only one hoodie partition. - Average records per output
* partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. * partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
*/ */
public interface BulkInsertPartitioner<I> extends Serializable { public interface BulkInsertPartitioner<I> extends Serializable {
/** /**
* Repartitions the input records into at least expected number of output spark partitions. * Repartitions the input records into at least expected number of output partitions.
* *
* @param records Input Hoodie records * @param records Input Hoodie records
* @param outputSparkPartitions Expected number of output partitions * @param outputPartitions Expected number of output partitions
* @return * @return
*/ */
I repartitionRecords(I records, int outputSparkPartitions); I repartitionRecords(I records, int outputPartitions);
/** /**
* @return {@code true} if the records within a partition are sorted; {@code false} otherwise. * @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
@@ -48,6 +48,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
/** /**
* Return file group id prefix for the given data partition. * Return file group id prefix for the given data partition.
* By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group * By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
*
* @param partitionId data partition * @param partitionId data partition
* @return * @return
*/ */
@@ -57,6 +58,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
/** /**
* Return write handle factory for the given partition. * Return write handle factory for the given partition.
*
* @param partitionId data partition * @param partitionId data partition
* @return * @return
*/ */

View File

@@ -70,6 +70,9 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy"; String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy"; String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy"; String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
@@ -82,6 +85,14 @@ public abstract class ClusteringPlanStrategy<T extends HoodieRecordPayload,I,K,O
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()); config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return sparkSizeBasedClassName; return sparkSizeBasedClassName;
} else if (flinkRecentDaysClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
return flinkSizeBasedClassName;
} else if (flinkSelectedPartitionsClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return flinkSizeBasedClassName;
} else if (javaSelectedPartitionClassName.equals(className)) { } else if (javaSelectedPartitionClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()); config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name())); LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));

View File

@@ -29,8 +29,10 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.HoodieTableVersion;
@@ -39,6 +41,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.FlinkHoodieIndexFactory;
@@ -68,6 +71,8 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@@ -399,6 +404,52 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
throw new HoodieNotSupportedException("Clustering is not supported yet"); throw new HoodieNotSupportedException("Clustering is not supported yet");
} }
private void completeClustering(
HoodieReplaceCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String clusteringCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
}
try {
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
finalizeWrite(table, clusteringCommitTime, writeStats);
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieClusteringException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
} finally {
this.txnManager.endTransaction(Option.of(clusteringInstant));
}
WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
}
}
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}
@Override @Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) { protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
@@ -412,6 +463,23 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// no need to execute the upgrade/downgrade on each write in streaming. // no need to execute the upgrade/downgrade on each write in streaming.
} }
public void completeTableService(
TableServiceType tableServiceType,
HoodieCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String commitInstant) {
switch (tableServiceType) {
case CLUSTER:
completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
break;
case COMPACT:
completeCompaction(metadata, table, commitInstant);
break;
default:
throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
}
}
/** /**
* Upgrade downgrade the Hoodie table. * Upgrade downgrade the Hoodie table.
* *

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* Clustering Strategy based on following.
* 1) Only looks at latest 'daybased.lookback.partitions' partitions.
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
*/
public class FlinkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends FlinkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);
public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
return partitionPaths.stream()
.sorted(Comparator.reverseOrder())
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
/**
* Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive.
*/
public class FlinkSelectedPartitionsClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends FlinkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class);
public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
List<String> filteredPartitions = partitionPaths.stream()
.filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
.collect(Collectors.toList());
LOG.info("Filtered to the following partitions: " + filteredPartitions);
return filteredPartitions;
}
}

View File

@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
/**
* Clustering Strategy based on following.
* 1) Creates clustering groups based on max size allowed per group.
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
*/
public class FlinkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public FlinkSizeBasedClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
HoodieWriteConfig writeConfig = getWriteConfig();
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
long totalSizeSoFar = 0;
for (FileSlice currentSlice : fileSlices) {
// check if max size is reached and create new group, if needed.
// in now, every clustering group out put is 1 file group.
if (totalSizeSoFar >= writeConfig.getClusteringTargetFileMaxBytes() && !currentGroup.isEmpty()) {
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size());
fileSliceGroups.add(Pair.of(currentGroup, 1));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
// Add to the current file-group
currentGroup.add(currentSlice);
// assume each file group size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
}
if (!currentGroup.isEmpty()) {
fileSliceGroups.add(Pair.of(currentGroup, 1));
}
return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
@Override
protected Map<String, String> getStrategyParams() {
Map<String, String> params = new HashMap<>();
if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), getWriteConfig().getClusteringSortColumns());
}
return params;
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
return partitionPaths;
}
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are eligible.
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
}
private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
return (int) Math.ceil(groupSize / (double) targetFileSize);
}
}

View File

@@ -55,6 +55,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor; import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor; import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
@@ -286,7 +287,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
@Override @Override
public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) { public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Clustering is not supported on a Flink CopyOnWrite table"); return new ClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
} }
@Override @Override

View File

@@ -49,7 +49,7 @@ public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
@Override @Override
public List<HoodieRecord<T>> repartitionRecords( public List<HoodieRecord<T>> repartitionRecords(
List<HoodieRecord<T>> records, int outputSparkPartitions) { List<HoodieRecord<T>> records, int outputPartitions) {
return records.stream().sorted((o1, o2) -> { return records.stream().sorted((o1, o2) -> {
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled); Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled);
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled); Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled);

View File

@@ -37,7 +37,7 @@ public class JavaGlobalSortPartitioner<T extends HoodieRecordPayload>
@Override @Override
public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records, public List<HoodieRecord<T>> repartitionRecords(List<HoodieRecord<T>> records,
int outputSparkPartitions) { int outputPartitions) {
// Now, sort the records and line them up nicely for loading. // Now, sort the records and line them up nicely for loading.
records.sort(new Comparator() { records.sort(new Comparator() {
@Override @Override

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.configuration; package org.apache.hudi.configuration;
import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieConfig;
@@ -583,6 +584,72 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(40)// default min 40 commits .defaultValue(40)// default min 40 commits
.withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40"); .withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40");
// ------------------------------------------------------------------------
// Clustering Options
// ------------------------------------------------------------------------
public static final ConfigOption<Boolean> CLUSTERING_SCHEDULE_ENABLED = ConfigOptions
.key("clustering.schedule.enabled")
.booleanType()
.defaultValue(false) // default false for pipeline
.withDescription("Schedule the cluster plan, default false");
public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = ConfigOptions
.key("clustering.delta_commits")
.intType()
.defaultValue(4)
.withDescription("Max delta commits needed to trigger clustering, default 4 commits");
public static final ConfigOption<Integer> CLUSTERING_TASKS = ConfigOptions
.key("clustering.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual clustering, default is 4");
public static final ConfigOption<Integer> CLUSTERING_TARGET_PARTITIONS = ConfigOptions
.key("clustering.plan.strategy.daybased.lookback.partitions")
.intType()
.defaultValue(2)
.withDescription("Number of partitions to list to create ClusteringPlan, default is 2");
public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions
.key("clustering.plan.strategy.class")
.stringType()
.defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
.withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
+ CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
.key("clustering.plan.strategy.target.file.max.bytes")
.intType()
.defaultValue(1024 * 1024 * 1024) // default 1 GB
.withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB");
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
.key("clustering.plan.strategy.small.file.limit")
.intType()
.defaultValue(600) // default 600 MB
.withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB");
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
.key("clustering.plan.strategy.daybased.skipfromlatest.partitions")
.intType()
.defaultValue(0)
.withDescription("Number of partitions to skip from latest when choosing partitions to create ClusteringPlan");
public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = ConfigOptions
.key("clustering.plan.strategy.sort.columns")
.stringType()
.noDefaultValue()
.withDescription("Columns to sort the data by when clustering");
public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = ConfigOptions
.key("clustering.plan.strategy.max.num.groups")
.intType()
.defaultValue(30)
.withDescription("Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism, default is 30");
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Hive Sync Options // Hive Sync Options
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------

View File

@@ -48,7 +48,7 @@ public class SortOperatorGen {
codeGen.generateRecordComparator("SortComparator")); codeGen.generateRecordComparator("SortComparator"));
} }
private SortCodeGenerator createSortCodeGenerator() { public SortCodeGenerator createSortCodeGenerator() {
SortSpec.SortSpecBuilder builder = SortSpec.builder(); SortSpec.SortSpecBuilder builder = SortSpec.builder();
IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true)); IntStream.range(0, sortIndices.length).forEach(i -> builder.addField(i, true, true));
return new SortCodeGenerator(tableConfig, rowType, builder.build()); return new SortCodeGenerator(tableConfig, rowType, builder.build());

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.client.WriteStatus;
import java.io.Serializable;
import java.util.List;
/**
* Represents a commit event from the clustering task {@link ClusteringFunction}.
*/
public class ClusteringCommitEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* The clustering commit instant time.
*/
private String instant;
/**
* The write statuses.
*/
private List<WriteStatus> writeStatuses;
/**
* The clustering task identifier.
*/
private int taskID;
public ClusteringCommitEvent() {
}
public ClusteringCommitEvent(String instant, List<WriteStatus> writeStatuses, int taskID) {
this.instant = instant;
this.writeStatuses = writeStatuses;
this.taskID = taskID;
}
public void setInstant(String instant) {
this.instant = instant;
}
public void setWriteStatuses(List<WriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}
public void setTaskID(int taskID) {
this.taskID = taskID;
}
public String getInstant() {
return instant;
}
public List<WriteStatus> getWriteStatuses() {
return writeStatuses;
}
public int getTaskID() {
return taskID;
}
}

View File

@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Function to check and commit the clustering action.
*
* <p> Each time after receiving a clustering commit event {@link ClusteringCommitEvent},
* it loads and checks the clustering plan {@link org.apache.hudi.avro.model.HoodieClusteringPlan},
* if all the clustering operations {@link org.apache.hudi.common.model.ClusteringOperation}
* of the plan are finished, tries to commit the clustering action.
*
* <p>It also inherits the {@link CleanFunction} cleaning ability. This is needed because
* the SQL API does not allow multiple sinks in one table sink provider.
*/
public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(ClusteringCommitSink.class);
/**
* Config options.
*/
private final Configuration conf;
private transient HoodieFlinkTable<?> table;
/**
* Buffer to collect the event from each clustering task {@code ClusteringFunction}.
* The key is the instant time.
*/
private transient Map<String, List<ClusteringCommitEvent>> commitBuffer;
public ClusteringCommitSink(Configuration conf) {
super(conf);
this.conf = conf;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (writeClient == null) {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
}
this.commitBuffer = new HashMap<>();
this.table = writeClient.getHoodieTable();
}
@Override
public void invoke(ClusteringCommitEvent event, Context context) throws Exception {
final String instant = event.getInstant();
commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
.add(event);
commitIfNecessary(instant, commitBuffer.get(instant));
}
/**
* Condition to commit: the commit buffer has equal size with the clustering plan operations
* and all the clustering commit event {@link ClusteringCommitEvent} has the same clustering instant time.
*
* @param instant Clustering commit instant time
* @param events Commit events ever received for the instant
*/
private void commitIfNecessary(String instant, List<ClusteringCommitEvent> events) {
HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(instant);
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
StreamerUtil.createMetaClient(this.conf), clusteringInstant);
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
boolean isReady = clusteringPlan.getInputGroups().size() == events.size();
if (!isReady) {
return;
}
List<WriteStatus> statuses = events.stream()
.map(ClusteringCommitEvent::getWriteStatuses)
.flatMap(Collection::stream)
.collect(Collectors.toList());
HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(statuses);
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
validateWriteResult(clusteringPlan, instant, writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(
writeMetadata.getWriteStats().get(),
writeMetadata.getPartitionToReplaceFileIds(),
Option.empty(),
WriteOperationType.CLUSTER,
this.writeClient.getConfig().getSchema(),
HoodieTimeline.REPLACE_COMMIT_ACTION);
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
// commit the clustering
this.table.getMetaClient().reloadActiveTimeline();
this.writeClient.completeTableService(
TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant);
// reset the status
reset(instant);
}
private void reset(String instant) {
this.commitBuffer.remove(instant);
}
/**
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
*/
private static void validateWriteResult(HoodieClusteringPlan clusteringPlan, String instantTime, HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
}
private static Map<String, List<String>> getPartitionToReplacedFileIds(
HoodieClusteringPlan clusteringPlan,
HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> !newFilesWritten.contains(fg))
.collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
}
}

View File

@@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
/**
* Operator to execute the actual clustering task assigned by the clustering plan task.
* In order to execute scalable, the input should shuffle by the clustering event {@link ClusteringPlanEvent}.
*/
public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEvent> implements
OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, BoundedOneInput {
private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
private final Configuration conf;
private final RowType rowType;
private int taskID;
private transient HoodieWriteConfig writeConfig;
private transient HoodieFlinkTable<?> table;
private transient Schema schema;
private transient Schema readerSchema;
private transient int[] requiredPos;
private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private transient HoodieFlinkWriteClient writeClient;
private transient BulkInsertWriterHelper writerHelper;
private transient String instantTime;
private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<ClusteringCommitEvent> collector;
private transient BinaryRowDataSerializer binarySerializer;
public ClusteringOperator(Configuration conf, RowType rowType) {
this.conf = conf;
this.rowType = rowType;
}
@Override
public void open() throws Exception {
super.open();
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.table = writeClient.getHoodieTable();
this.schema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
this.requiredPos = getRequiredPositions();
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
AbstractRowDataSerializer inputSerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
this.sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
computeMemorySize(),
this.getContainingTask().getEnvironment().getIOManager(),
inputSerializer,
binarySerializer,
computer,
comparator,
getContainingTask().getJobConfiguration());
this.sorter.startThreads();
collector = new StreamRecordCollector<>(output);
// register the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}
@Override
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
ClusteringPlanEvent event = element.getValue();
final String instantTime = event.getClusteringInstantTime();
final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();
initWriterHelper(instantTime);
List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
Iterator<RowData> iterator;
if (hasLogFiles) {
// if there are log files, we read all records into memory for a file group and apply updates.
iterator = readRecordsForGroupWithLogs(clusteringOps, instantTime);
} else {
// We want to optimize reading records for case there are no log files.
iterator = readRecordsForGroupBaseFiles(clusteringOps);
}
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
this.sorter.write(binaryRowData);
}
BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
this.writerHelper.write(row);
}
}
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}
/**
* End input action for batch source.
*/
public void endInput() {
List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void initWriterHelper(String clusteringInstantTime) {
if (this.writerHelper == null) {
this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
this.instantTime = clusteringInstantTime;
}
}
/**
* Read records from baseFiles, apply updates and convert to Iterator.
*/
@SuppressWarnings("unchecked")
private Iterator<RowData> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps, String instantTime) {
List<Iterator<RowData>> recordIterators = new ArrayList<>();
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new FlinkTaskContextSupplier(null), writeConfig);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
for (ClusteringOperation clusteringOp : clusteringOps) {
try {
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
.withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.build();
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
HoodieFileSliceReader<? extends IndexedRecord> hoodieFileSliceReader = HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPayloadClass(),
tableConfig.getPreCombineField(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp())));
recordIterators.add(StreamSupport.stream(Spliterators.spliteratorUnknownSize(hoodieFileSliceReader, Spliterator.NONNULL), false).map(hoodieRecord -> {
try {
return this.transform((IndexedRecord) hoodieRecord.getData().getInsertValue(readerSchema).get());
} catch (IOException e) {
throw new HoodieIOException("Failed to read next record", e);
}
}).iterator());
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
}
return new ConcatenatingIterator<>(recordIterators);
}
/**
* Read records from baseFiles and get iterator.
*/
private Iterator<RowData> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<Iterator<RowData>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
Iterable<IndexedRecord> indexedRecords = () -> {
try {
return HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
};
return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator();
}).collect(Collectors.toList());
return new ConcatenatingIterator<>(iteratorsForPartition);
}
/**
* Transform IndexedRecord into HoodieRecord.
*/
private RowData transform(IndexedRecord indexedRecord) {
GenericRecord record = buildAvroRecordBySchema(indexedRecord, schema, requiredPos, new GenericRecordBuilder(schema));
return (RowData) avroToRowDataConverter.convert(record);
}
private int[] getRequiredPositions() {
final List<String> fieldNames = readerSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
return schema.getFields().stream()
.map(field -> fieldNames.indexOf(field.name()))
.mapToInt(i -> i)
.toArray();
}
private SortCodeGenerator createSortCodeGenerator() {
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(","));
return sortOperatorGen.createSortCodeGenerator();
}
@Override
public void setKeyContextElement(StreamRecord<ClusteringPlanEvent> record) throws Exception {
OneInputStreamOperator.super.setKeyContextElement(record);
}
}

View File

@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import java.io.Serializable;
import java.util.Map;
/**
* Represents a cluster command from the clustering plan task {@link ClusteringPlanSourceFunction}.
*/
public class ClusteringPlanEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String clusteringInstantTime;
private ClusteringGroupInfo clusteringGroupInfo;
private Map<String, String> strategyParams;
public ClusteringPlanEvent() {
}
public ClusteringPlanEvent(
String instantTime,
ClusteringGroupInfo clusteringGroupInfo,
Map<String, String> strategyParams) {
this.clusteringInstantTime = instantTime;
this.clusteringGroupInfo = clusteringGroupInfo;
this.strategyParams = strategyParams;
}
public void setClusteringInstantTime(String clusteringInstantTime) {
this.clusteringInstantTime = clusteringInstantTime;
}
public void setClusteringGroupInfo(ClusteringGroupInfo clusteringGroupInfo) {
this.clusteringGroupInfo = clusteringGroupInfo;
}
public void setStrategyParams(Map<String, String> strategyParams) {
this.strategyParams = strategyParams;
}
public String getClusteringInstantTime() {
return clusteringInstantTime;
}
public ClusteringGroupInfo getClusteringGroupInfo() {
return clusteringGroupInfo;
}
public Map<String, String> getStrategyParams() {
return strategyParams;
}
}

View File

@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flink hudi clustering source function.
*
* <P>This function read the clustering plan as {@link ClusteringOperation}s then assign the clustering task
* event {@link ClusteringPlanEvent} to downstream operators.
*
* <p>The clustering instant time is specified explicitly with strategies:
*
* <ul>
* <li>If the timeline has no inflight instants,
* use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
* as the instant time;</li>
* <li>If the timeline has inflight instants,
* use the median instant time between [last complete instant time, earliest inflight instant time]
* as the instant time.</li>
* </ul>
*/
public class ClusteringPlanSourceFunction extends AbstractRichFunction implements SourceFunction<ClusteringPlanEvent> {
protected static final Logger LOG = LoggerFactory.getLogger(ClusteringPlanSourceFunction.class);
/**
* The clustering plan.
*/
private final HoodieClusteringPlan clusteringPlan;
/**
* Hoodie instant.
*/
private final HoodieInstant instant;
public ClusteringPlanSourceFunction(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
this.instant = instant;
this.clusteringPlan = clusteringPlan;
}
@Override
public void open(Configuration parameters) throws Exception {
// no operation
}
@Override
public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception {
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files");
sourceContext.collect(new ClusteringPlanEvent(this.instant.getTimestamp(), ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()));
}
}
@Override
public void close() throws Exception {
// no operation
}
@Override
public void cancel() {
// no operation
}
}

View File

@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.configuration.FlinkOptions;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
/**
* Configurations for Hoodie Flink clustering.
*/
public class FlinkClusteringConfig extends Configuration {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
// ------------------------------------------------------------------------
// Hudi Write Options
// ------------------------------------------------------------------------
@Parameter(names = {"--path"}, description = "Base path for the target hoodie table.", required = true)
public String path;
// ------------------------------------------------------------------------
// Clustering Options
// ------------------------------------------------------------------------
@Parameter(names = {"--clustering-delta-commits"}, description = "Max delta commits needed to trigger clustering, default 4 commits", required = false)
public Integer clusteringDeltaCommits = 1;
@Parameter(names = {"--clustering-tasks"}, description = "Parallelism of tasks that do actual clustering, default is -1", required = false)
public Integer clusteringTasks = -1;
@Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
public Integer compactionMaxMemory = 100;
@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10",
required = false)
public Integer cleanRetainCommits = 10;
@Parameter(names = {"--archive-min-commits"},
description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
required = false)
public Integer archiveMinCommits = 20;
@Parameter(names = {"--archive-max-commits"},
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
required = false)
public Integer archiveMaxCommits = 30;
@Parameter(names = {"--schedule", "-sc"}, description = "Not recommended. Schedule the clustering plan in this job.\n"
+ "There is a risk of losing data when scheduling clustering outside the writer job.\n"
+ "Scheduling clustering in the writer job and only let this job do the clustering execution is recommended.\n"
+ "Default is true", required = false)
public Boolean schedule = true;
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
public Boolean cleanAsyncEnable = false;
@Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
@Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
@Parameter(names = {"--small-file-limit"}, description = "Files smaller than the size specified here are candidates for clustering, default 600 MB", required = false)
public Integer smallFileLimit = 600;
@Parameter(names = {"--skip-from-latest-partitions"}, description = "Number of partitions to skip from latest when choosing partitions to create ClusteringPlan, default 0", required = false)
public Integer skipFromLatestPartitions = 0;
@Parameter(names = {"--sort-columns"}, description = "Columns to sort the data by when clustering.", required = false)
public String sortColumns = "";
@Parameter(names = {"--max-num-groups"}, description = "Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. default 30", required = false)
public Integer maxNumGroups = 30;
@Parameter(names = {"--target-partitions"}, description = "Number of partitions to list to create ClusteringPlan, default 2", required = false)
public Integer targetPartitions = 2;
public static final String SEQ_FIFO = "FIFO";
public static final String SEQ_LIFO = "LIFO";
@Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n"
+ "1). FIFO: execute the oldest plan first;\n"
+ "2). LIFO: execute the latest plan first, by default LIFO", required = false)
public String clusteringSeq = SEQ_LIFO;
@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
public Boolean writePartitionUrlEncode = false;
@Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)")
public Boolean hiveStylePartitioning = false;
/**
* Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
*/
public static Configuration toFlinkConfig(FlinkClusteringConfig config) {
Configuration conf = new Configuration();
conf.setString(FlinkOptions.PATH, config.path);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);
conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, config.sortColumns);
conf.setInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS, config.maxNumGroups);
conf.setInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS, config.targetPartitions);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
// use synchronous clustering always
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, config.schedule);
// bulk insert conf
conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);
return conf;
}
}

View File

@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flink hudi clustering program that can be executed manually.
*/
public class HoodieFlinkClusteringJob {
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
// create metaClient
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
// set table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
// set table type
conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
// set record key field
conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
// set partition field
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// judge whether have operation
// to compute the clustering instant time and do cluster.
if (cfg.schedule) {
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No clustering plan for this job ");
return;
}
}
table.getMetaClient().reloadActiveTimeline();
// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant();
if (!requested.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
return;
}
HoodieInstant clusteringInstant = requested.get();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
writeClient.rollbackInflightClustering(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
}
// generate clustering plan
// should support configurable commit metadata
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), clusteringInstant);
if (!clusteringPlanOption.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
return;
}
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
|| (clusteringPlan.getInputGroups().isEmpty())) {
// No clustering plan, do nothing and return.
LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp());
return;
}
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
if (!pendingClusteringTimeline.containsInstant(instant)) {
// this means that the clustering plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.
// clean the clustering plan in auxiliary path and cancels the clustering.
LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the clustering plan in auxiliary path and cancels the clustering");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return;
}
// get clusteringParallelism.
int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
// setup configuration
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(clusteringPlan.getInputGroups().size());
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
dataStream
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);
env.execute("flink_hudi_clustering");
}
}

View File

@@ -192,7 +192,7 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean indexGlobalEnabled = true; public Boolean indexGlobalEnabled = true;
@Parameter(names = {"--index-partition-regex"}, @Parameter(names = {"--index-partition-regex"},
description = "Whether to load partitions in state if partition path matching default *") description = "Whether to load partitions in state if partition path matching, default *")
public String indexPartitionRegex = ".*"; public String indexPartitionRegex = ".*";
@Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization") @Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodiePayloadConfig;
@@ -162,6 +164,17 @@ public class StreamerUtil {
.withPath(conf.getString(FlinkOptions.PATH)) .withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
.withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf))
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
.withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
.withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))
.withClusteringPlanSmallFileLimit(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L)
.withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
.withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
.build())
.withCompactionConfig( .withCompactionConfig(
HoodieCompactionConfig.newBuilder() HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
@@ -505,6 +518,11 @@ public class StreamerUtil {
* Returns the max compaction memory in bytes with given conf. * Returns the max compaction memory in bytes with given conf.
*/ */
public static long getMaxCompactionMemoryInBytes(Configuration conf) { public static long getMaxCompactionMemoryInBytes(Configuration conf) {
return conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024; return (long) conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
}
public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
return schemaUtil.getTableAvroSchema(includeMetadataFields);
} }
} }

View File

@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.cluster;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanSourceFunction;
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* IT cases for {@link HoodieFlinkClusteringJob}.
*/
public class ITTestHoodieFlinkClustering {
private static final Map<String, String> EXPECTED = new HashMap<>();
static {
EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
}
@TempDir
File tempFile;
@Test
public void testHoodieFlinkClustering() throws Exception {
// Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
// use append mode
options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);
// Make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.targetPartitions = 4;
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
// create metaClient
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
// set the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
// set record key field
conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
// set partition field
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
// judge whether have operation
// To compute the clustering instant time and do clustering.
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
assertTrue(scheduled, "The clustering plan should be scheduled");
// fetch the instant based on the configured execution sequence
table.getMetaClient().reloadActiveTimeline();
HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
// generate clustering plan
// should support configurable commit metadata
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), timeline.lastInstant().get());
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
// Mark instant as clustering inflight
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(clusteringPlan.getInputGroups().size());
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
dataStream
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);
env.execute("flink_hudi_clustering");
TestData.checkWrittenData(tempFile, EXPECTED, 4);
}
}