1
0

[HUDI-4273] Support inline schedule clustering for Flink stream (#5890)

* [HUDI-4273] Support inline schedule clustering for Flink stream

* delete deprecated clustering plan strategy and add clustering ITTest
This commit is contained in:
Zhaojing Yu
2022-06-24 11:28:06 +08:00
committed by GitHub
parent af9f09047d
commit 6456bd3a51
29 changed files with 1116 additions and 385 deletions

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.configuration;
import org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy;
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
@@ -45,6 +45,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST;
/**
* Hoodie Flink config options.
*
@@ -594,6 +599,12 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(false) // default false for pipeline
.withDescription("Schedule the cluster plan, default false");
public static final ConfigOption<Boolean> CLUSTERING_ASYNC_ENABLED = ConfigOptions
.key("clustering.async.enabled")
.booleanType()
.defaultValue(false) // default false for pipeline
.withDescription("Async Clustering, default false");
public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = ConfigOptions
.key("clustering.delta_commits")
.intType()
@@ -615,11 +626,22 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> CLUSTERING_PLAN_STRATEGY_CLASS = ConfigOptions
.key("clustering.plan.strategy.class")
.stringType()
.defaultValue(FlinkRecentDaysClusteringPlanStrategy.class.getName())
.defaultValue(FlinkSizeBasedClusteringPlanStrategy.class.getName())
.withDescription("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default strategy, looks at the last N (determined by "
+ CLUSTERING_TARGET_PARTITIONS.key() + ") day based partitions picks the small file slices within those partitions.");
public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions
.key("clustering.plan.partition.filter.mode")
.stringType()
.defaultValue("NONE")
.withDescription("Partition filter mode used in the creation of clustering plan. Available values are - "
+ "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate."
+ "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
+ PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
+ PARTITION_FILTER_END_PARTITION.key() + "'].");
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
.key("clustering.plan.strategy.target.file.max.bytes")
.intType()
@@ -641,7 +663,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = ConfigOptions
.key("clustering.plan.strategy.sort.columns")
.stringType()
.noDefaultValue()
.defaultValue("")
.withDescription("Columns to sort the data by when clustering");
public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = ConfigOptions

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.configuration;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.format.FilePathUtils;
@@ -42,7 +43,10 @@ public class OptionsResolver {
* Returns whether the insert is clustering disabled with given configuration {@code conf}.
*/
public static boolean isAppendMode(Configuration conf) {
return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER);
// 1. inline clustering is supported for COW table;
// 2. async clustering is supported for both COW and MOR table
return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER)
|| needsScheduleClustering(conf);
}
/**
@@ -115,4 +119,49 @@ public class OptionsResolver {
return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
&& conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
}
/**
* Returns whether there is need to schedule the async compaction.
*
* @param conf The flink configuration.
*/
public static boolean needsAsyncCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf)
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}
/**
* Returns whether there is need to schedule the compaction plan.
*
* @param conf The flink configuration.
*/
public static boolean needsScheduleCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf)
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}
/**
* Returns whether there is need to schedule the async clustering.
*
* @param conf The flink configuration.
*/
public static boolean needsAsyncClustering(Configuration conf) {
return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED);
}
/**
* Returns whether there is need to schedule the clustering plan.
*
* @param conf The flink configuration.
*/
public static boolean needsScheduleClustering(Configuration conf) {
return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED);
}
/**
* Returns whether the clustering sort is enabled.
*/
public static boolean sortClusteringEnabled(Configuration conf) {
return !StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS));
}
}

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
@@ -253,6 +254,11 @@ public class StreamWriteOperatorCoordinator
CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
}
if (tableState.scheduleClustering) {
// if async clustering is on, schedule the clustering
ClusteringUtil.scheduleClustering(conf, writeClient, committed);
}
if (committed) {
// start new instant.
startInstant();
@@ -607,6 +613,7 @@ public class StreamWriteOperatorCoordinator
final String commitAction;
final boolean isOverwrite;
final boolean scheduleCompaction;
final boolean scheduleClustering;
final boolean syncHive;
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;
@@ -616,7 +623,8 @@ public class StreamWriteOperatorCoordinator
this.commitAction = CommitUtils.getCommitActionType(this.operationType,
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf);
this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf);
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);

View File

@@ -24,7 +24,7 @@ import java.io.Serializable;
import java.util.List;
/**
* Represents a commit event from the clustering task {@link ClusteringFunction}.
* Represents a commit event from the clustering task {@link ClusteringOperator}.
*/
public class ClusteringCommitEvent implements Serializable {
private static final long serialVersionUID = 1L;
@@ -51,6 +51,10 @@ public class ClusteringCommitEvent implements Serializable {
this.taskID = taskID;
}
public ClusteringCommitEvent(String instant, int taskID) {
this(instant, null, taskID);
}
public void setInstant(String instant) {
this.instant = instant;
}
@@ -74,4 +78,8 @@ public class ClusteringCommitEvent implements Serializable {
public int getTaskID() {
return taskID;
}
public boolean isFailed() {
return this.writeStatuses == null;
}
}

View File

@@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
@@ -115,6 +116,30 @@ public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
if (!isReady) {
return;
}
if (events.stream().anyMatch(ClusteringCommitEvent::isFailed)) {
try {
// handle failure case
CompactionUtil.rollbackCompaction(table, instant);
} finally {
// remove commitBuffer to avoid obsolete metadata commit
reset(instant);
}
return;
}
try {
doCommit(instant, clusteringPlan, events);
} catch (Throwable throwable) {
// make it fail-safe
LOG.error("Error while committing clustering instant: " + instant, throwable);
} finally {
// reset the status
reset(instant);
}
}
private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List<ClusteringCommitEvent> events) {
List<WriteStatus> statuses = events.stream()
.map(ClusteringCommitEvent::getWriteStatuses)
.flatMap(Collection::stream)
@@ -139,9 +164,6 @@ public class ClusteringCommitSink extends CleanFunction<ClusteringCommitEvent> {
this.table.getMetaClient().reloadActiveTimeline();
this.writeClient.completeTableService(
TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant);
// reset the status
reset(instant);
}
private void reset(String instant) {

View File

@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
@@ -40,7 +41,9 @@ import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.StreamerUtil;
@@ -48,11 +51,13 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -100,15 +105,31 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private transient HoodieFlinkWriteClient writeClient;
private transient BulkInsertWriterHelper writerHelper;
private transient String instantTime;
private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<ClusteringCommitEvent> collector;
private transient BinaryRowDataSerializer binarySerializer;
/**
* Whether to execute clustering asynchronously.
*/
private final boolean asyncClustering;
/**
* Whether the clustering sort is enabled.
*/
private final boolean sortClusteringEnabled;
/**
* Executor service to execute the clustering task.
*/
private transient NonThrownExecutor executor;
public ClusteringOperator(Configuration conf, RowType rowType) {
this.conf = conf;
this.rowType = rowType;
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
}
@Override
@@ -120,46 +141,61 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.table = writeClient.getHoodieTable();
this.schema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
this.schema = AvroSchemaConverter.convertToSchema(rowType);
this.readerSchema = HoodieAvroUtils.addMetadataFields(this.schema);
this.requiredPos = getRequiredPositions();
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
if (this.sortClusteringEnabled) {
initSorter();
}
AbstractRowDataSerializer inputSerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
this.sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
computeMemorySize(),
this.getContainingTask().getEnvironment().getIOManager(),
inputSerializer,
binarySerializer,
computer,
comparator,
getContainingTask().getJobConfiguration());
this.sorter.startThreads();
if (this.asyncClustering) {
this.executor = NonThrownExecutor.builder(LOG).build();
}
collector = new StreamRecordCollector<>(output);
// register the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}
@Override
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
ClusteringPlanEvent event = element.getValue();
final String instantTime = event.getClusteringInstantTime();
if (this.asyncClustering) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doClustering(instantTime, event),
(errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, taskID)),
"Execute clustering for instant %s from task %d", instantTime, taskID);
} else {
// executes the clustering task synchronously for batch mode.
LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID);
doClustering(instantTime, event);
}
}
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}
/**
* End input action for batch source.
*/
public void endInput() {
// no operation
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception {
final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();
initWriterHelper(instantTime);
@@ -177,44 +213,33 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
}
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
this.sorter.write(binaryRowData);
if (this.sortClusteringEnabled) {
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
this.sorter.write(binaryRowData);
}
BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
this.writerHelper.write(row);
}
} else {
while (iterator.hasNext()) {
this.writerHelper.write(iterator.next());
}
}
BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
this.writerHelper.write(row);
}
}
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}
/**
* End input action for batch source.
*/
public void endInput() {
List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void initWriterHelper(String clusteringInstantTime) {
if (this.writerHelper == null) {
this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
this.instantTime = clusteringInstantTime;
}
}
@@ -305,14 +330,44 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
.toArray();
}
private void initSorter() {
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
this.sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
computeMemorySize(),
this.getContainingTask().getEnvironment().getIOManager(),
(AbstractRowDataSerializer) binarySerializer,
binarySerializer,
computer,
comparator,
getContainingTask().getJobConfiguration());
this.sorter.startThreads();
// register the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}
private SortCodeGenerator createSortCodeGenerator() {
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS).split(","));
return sortOperatorGen.createSortCodeGenerator();
}
@Override
public void setKeyContextElement(StreamRecord<ClusteringPlanEvent> record) throws Exception {
OneInputStreamOperator.super.setKeyContextElement(record);
@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
}
@VisibleForTesting
public void setOutput(Output<StreamRecord<ClusteringCommitEvent>> output) {
this.output = output;
}
}

View File

@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.clustering;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
* Operator that generates the clustering plan with pluggable strategies on finished checkpoints.
*
* <p>It should be singleton to avoid conflicts.
*/
public class ClusteringPlanOperator extends AbstractStreamOperator<ClusteringPlanEvent>
implements OneInputStreamOperator<Object, ClusteringPlanEvent> {
/**
* Config options.
*/
private final Configuration conf;
/**
* Meta Client.
*/
@SuppressWarnings("rawtypes")
private transient HoodieFlinkTable table;
public ClusteringPlanOperator(Configuration conf) {
this.conf = conf;
}
@Override
public void open() throws Exception {
super.open();
this.table = FlinkTables.createTable(conf, getRuntimeContext());
// when starting up, rolls back all the inflight clustering instants if there exists,
// these instants are in priority for scheduling task because the clustering instants are
// scheduled from earliest(FIFO sequence).
ClusteringUtil.rollbackClustering(table, StreamerUtil.createWriteClient(conf, getRuntimeContext()));
}
@Override
public void processElement(StreamRecord<Object> streamRecord) {
// no operation
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
try {
table.getMetaClient().reloadActiveTimeline();
scheduleClustering(table, checkpointId);
} catch (Throwable throwable) {
// make it fail-safe
LOG.error("Error while scheduling clustering plan for checkpoint: " + checkpointId, throwable);
}
}
private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {
// the first instant takes the highest priority.
Option<HoodieInstant> firstRequested = Option.fromJavaOptional(
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No clustering plan for checkpoint " + checkpointId);
return;
}
String clusteringInstantTime = firstRequested.get().getTimestamp();
// generate clustering plan
// should support configurable commit metadata
HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), clusteringInstant);
if (!clusteringPlanOption.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled");
return;
}
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
|| (clusteringPlan.getInputGroups().isEmpty())) {
// do nothing.
LOG.info("Empty clustering plan for instant " + clusteringInstantTime);
} else {
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());
output.collect(new StreamRecord<>(
new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())
));
}
}
}
@VisibleForTesting
public void setOutput(Output<StreamRecord<ClusteringPlanEvent>> output) {
this.output = output;
}
}

View File

@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
@@ -57,12 +56,12 @@ public class ClusteringPlanSourceFunction extends AbstractRichFunction implement
private final HoodieClusteringPlan clusteringPlan;
/**
* Hoodie instant.
* Clustering instant time.
*/
private final HoodieInstant instant;
private final String clusteringInstantTime;
public ClusteringPlanSourceFunction(HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
this.instant = instant;
public ClusteringPlanSourceFunction(String clusteringInstantTime, HoodieClusteringPlan clusteringPlan) {
this.clusteringInstantTime = clusteringInstantTime;
this.clusteringPlan = clusteringPlan;
}
@@ -74,8 +73,8 @@ public class ClusteringPlanSourceFunction extends AbstractRichFunction implement
@Override
public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception {
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
LOG.info("ClusteringPlanSourceFunction cluster " + clusteringGroup + " files");
sourceContext.collect(new ClusteringPlanEvent(this.instant.getTimestamp(), ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()));
LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());
sourceContext.collect(new ClusteringPlanEvent(this.clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams()));
}
}

View File

@@ -18,7 +18,10 @@
package org.apache.hudi.sink.clustering;
import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
@@ -76,7 +79,10 @@ public class FlinkClusteringConfig extends Configuration {
public Boolean cleanAsyncEnable = false;
@Parameter(names = {"--plan-strategy-class"}, description = "Config to provide a strategy class to generator clustering plan", required = false)
public String planStrategyClass = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
public String planStrategyClass = FlinkSizeBasedClusteringPlanStrategy.class.getName();
@Parameter(names = {"--plan-partition-filter-mode"}, description = "Partition filter mode used in the creation of clustering plan", required = false)
public String planPartitionFilterMode = "NONE";
@Parameter(names = {"--target-file-max-bytes"}, description = "Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB", required = false)
public Integer targetFileMaxBytes = 1024 * 1024 * 1024;
@@ -100,16 +106,15 @@ public class FlinkClusteringConfig extends Configuration {
public static final String SEQ_LIFO = "LIFO";
@Parameter(names = {"--seq"}, description = "Clustering plan execution sequence, two options are supported:\n"
+ "1). FIFO: execute the oldest plan first;\n"
+ "2). LIFO: execute the latest plan first, by default LIFO", required = false)
public String clusteringSeq = SEQ_LIFO;
+ "2). LIFO: execute the latest plan first, by default FIFO", required = false)
public String clusteringSeq = SEQ_FIFO;
@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
public Boolean writePartitionUrlEncode = false;
@Parameter(names = {"--service"}, description = "Flink Clustering runs in service mode, disable by default")
public Boolean serviceMode = false;
@Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n"
+ "If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.\n"
+ "By default false (the names of partition folders are only partition values)")
public Boolean hiveStylePartitioning = false;
@Parameter(names = {"--min-clustering-interval-seconds"},
description = "Min clustering interval of async clustering service, default 10 minutes")
public Integer minClusteringIntervalSeconds = 600;
/**
* Transforms a {@code FlinkClusteringConfig.config} into {@code Configuration}.
@@ -128,6 +133,7 @@ public class FlinkClusteringConfig extends Configuration {
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, config.clusteringDeltaCommits);
conf.setInteger(FlinkOptions.CLUSTERING_TASKS, config.clusteringTasks);
conf.setString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS, config.planStrategyClass);
conf.setString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME, config.planPartitionFilterMode);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES, config.targetFileMaxBytes);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT, config.smallFileLimit);
conf.setInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST, config.skipFromLatestPartitions);
@@ -137,11 +143,13 @@ public class FlinkClusteringConfig extends Configuration {
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
// use synchronous clustering always
conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, config.schedule);
// bulk insert conf
conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);
HoodieTableConfig tableConfig = StreamerUtil.createMetaClient(conf).getTableConfig();
conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning()));
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable()));
return conf;
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.sink.clustering;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -28,13 +29,16 @@ import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -45,6 +49,12 @@ import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* Flink hudi clustering program that can be executed manually.
*/
@@ -52,141 +62,278 @@ public class HoodieFlinkClusteringJob {
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
/**
* Flink Execution Environment.
*/
private final AsyncClusteringService clusteringScheduleService;
public HoodieFlinkClusteringJob(AsyncClusteringService service) {
this.clusteringScheduleService = service;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = getFlinkClusteringConfig(args);
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
AsyncClusteringService service = new AsyncClusteringService(cfg, conf, env);
new HoodieFlinkClusteringJob(service).start(cfg.serviceMode);
}
/**
* Main method to start clustering service.
*/
public void start(boolean serviceMode) throws Exception {
if (serviceMode) {
clusteringScheduleService.start(null);
try {
clusteringScheduleService.waitForShutdown();
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
} finally {
LOG.info("Shut down hoodie flink clustering");
}
} else {
LOG.info("Hoodie Flink Clustering running only single round");
try {
clusteringScheduleService.cluster();
} catch (Exception e) {
LOG.error("Got error running delta sync once. Shutting down", e);
throw e;
} finally {
LOG.info("Shut down hoodie flink clustering");
}
}
}
public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) {
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
return cfg;
}
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
// create metaClient
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
/**
* Schedules clustering in service.
*/
public static class AsyncClusteringService extends HoodieAsyncTableService {
private static final long serialVersionUID = 1L;
// set table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
/**
* Flink Clustering Config.
*/
private final FlinkClusteringConfig cfg;
// set table type
conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
/**
* Flink Config.
*/
private final Configuration conf;
// set record key field
conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
/**
* Meta Client.
*/
private final HoodieTableMetaClient metaClient;
// set partition field
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
/**
* Write Client.
*/
private final HoodieFlinkWriteClient<?> writeClient;
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
/**
* The hoodie table.
*/
private final HoodieFlinkTable<?> table;
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
/**
* Flink Execution Environment.
*/
private final StreamExecutionEnvironment env;
// judge whether have operation
// to compute the clustering instant time and do cluster.
if (cfg.schedule) {
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
if (!scheduled) {
/**
* Executor Service.
*/
private final ExecutorService executor;
public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception {
this.cfg = cfg;
this.conf = conf;
this.env = env;
this.executor = Executors.newFixedThreadPool(1);
// create metaClient
this.metaClient = StreamerUtil.createMetaClient(conf);
// set table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
// set table type
conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
// set record key field
conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
// set partition field
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
this.writeClient = StreamerUtil.createWriteClient(conf);
this.writeConfig = writeClient.getConfig();
this.table = writeClient.getHoodieTable();
}
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
return Pair.of(CompletableFuture.supplyAsync(() -> {
boolean error = false;
try {
while (!isShutdownRequested()) {
try {
cluster();
Thread.sleep(cfg.minClusteringIntervalSeconds * 1000);
} catch (Exception e) {
LOG.error("Shutting down clustering service due to exception", e);
error = true;
throw new HoodieException(e.getMessage(), e);
}
}
} finally {
shutdownAsyncService(error);
}
return true;
}, executor), executor);
}
private void cluster() throws Exception {
table.getMetaClient().reloadActiveTimeline();
// judges whether there are operations
// to compute the clustering instant time and exec clustering.
if (cfg.schedule) {
ClusteringUtil.validateClusteringScheduling(conf);
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No clustering plan for this job");
return;
}
table.getMetaClient().reloadActiveTimeline();
}
// fetch the instant based on the configured execution sequence
List<HoodieInstant> instants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient()).stream()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).collect(Collectors.toList());
if (instants.isEmpty()) {
// do nothing.
LOG.info("No clustering plan for this job ");
LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
return;
}
HoodieInstant clusteringInstant = CompactionUtil.isLIFO(cfg.clusteringSeq) ? instants.get(instants.size() - 1) : instants.get(0);
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
if (table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
table.rollbackInflightClustering(inflightInstant,
commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}
// generate clustering plan
// should support configurable commit metadata
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), clusteringInstant);
if (!clusteringPlanOption.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
return;
}
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
|| (clusteringPlan.getInputGroups().isEmpty())) {
// No clustering plan, do nothing and return.
LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp());
return;
}
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
if (!pendingClusteringTimeline.containsInstant(instant)) {
// this means that the clustering plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.
// clean the clustering plan in auxiliary path and cancels the clustering.
LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the clustering plan in auxiliary path and cancels the clustering");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return;
}
// get clusteringParallelism.
int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
// setup configuration
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstant.getTimestamp(), clusteringPlan))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(clusteringParallelism);
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
dataStream
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);
env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp());
}
table.getMetaClient().reloadActiveTimeline();
// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant();
if (!requested.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
return;
/**
* Shutdown async services like compaction/clustering as DeltaSync is shutdown.
*/
public void shutdownAsyncService(boolean error) {
LOG.info("Gracefully shutting down clustering job. Error ?" + error);
executor.shutdown();
writeClient.close();
}
HoodieInstant clusteringInstant = requested.get();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]");
table.rollbackInflightClustering(inflightInstant,
commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
@VisibleForTesting
public void shutDown() {
shutdownAsyncService(false);
}
// generate clustering plan
// should support configurable commit metadata
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), clusteringInstant);
if (!clusteringPlanOption.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
return;
}
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
|| (clusteringPlan.getInputGroups().isEmpty())) {
// No clustering plan, do nothing and return.
LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp());
return;
}
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
if (!pendingClusteringTimeline.containsInstant(instant)) {
// this means that the clustering plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.
// clean the clustering plan in auxiliary path and cancels the clustering.
LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the clustering plan in auxiliary path and cancels the clustering");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return;
}
// get clusteringParallelism.
int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
// setup configuration
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(clusteringPlan.getInputGroups().size());
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
dataStream
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);
env.execute("flink_hudi_clustering");
}
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
@@ -72,7 +73,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
public CompactFunction(Configuration conf) {
this.conf = conf;
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
}
@Override

View File

@@ -135,8 +135,8 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
} finally {
// remove commitBuffer to avoid obsolete metadata commit
reset(instant);
return;
}
return;
}
try {

View File

@@ -32,6 +32,11 @@ import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
@@ -114,7 +119,7 @@ public class Pipelines {
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return dataStream
.transform(writeOpIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
.transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE)
@@ -146,7 +151,7 @@ public class Pipelines {
}
}
return dataStream
.transform(writeOpIdentifier("hoodie_bulk_insert_write", conf),
.transform(opIdentifier("hoodie_bulk_insert_write", conf),
TypeInformation.of(Object.class),
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
@@ -177,7 +182,7 @@ public class Pipelines {
* @param bounded Whether the input stream is bounded
* @return the appending data stream sink
*/
public static DataStreamSink<Object> append(
public static DataStream<Object> append(
Configuration conf,
RowType rowType,
DataStream<RowData> dataStream,
@@ -190,11 +195,9 @@ public class Pipelines {
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
return dataStream
.transform(writeOpIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
.transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE)
.name("dummy");
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
/**
@@ -322,7 +325,7 @@ public class Pipelines {
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform(writeOpIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
.transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
@@ -338,7 +341,7 @@ public class Pipelines {
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform(writeOpIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
.transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
@@ -379,13 +382,53 @@ public class Pipelines {
.setParallelism(1); // compaction commit should be singleton
}
/**
* The clustering tasks pipeline.
*
* <p>The clustering plan operator monitors the new clustering plan on the timeline
* then distributes the sub-plans to the clustering tasks. The clustering task then
* handle over the metadata to commit task for clustering transaction commit.
* The whole pipeline looks like the following:
*
* <pre>
* /=== | task1 | ===\
* | plan generation | ===> re-balance | commit |
* \=== | task2 | ===/
*
* Note: both the clustering plan generation task and commission task are singleton.
* </pre>
*
* @param conf The configuration
* @param rowType The input row type
* @param dataStream The input data stream
* @return the clustering pipeline
*/
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
return dataStream.transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // compaction commit should be singleton
}
public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
return dataStream.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
}
public static String writeOpIdentifier(String operatorN, Configuration conf) {
public static DataStreamSink<Object> dummySink(DataStream<Object> dataStream) {
return dataStream.addSink(Pipelines.DummySink.INSTANCE).name("dummy");
}
public static String opIdentifier(String operatorN, Configuration conf) {
return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -99,7 +100,7 @@ public class HoodieFlinkStreamer {
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
if (StreamerUtil.needsAsyncCompaction(conf)) {
if (OptionsResolver.needsAsyncCompaction(conf)) {
Pipelines.compact(conf, pipeline);
} else {
Pipelines.clean(conf, pipeline);

View File

@@ -24,7 +24,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -78,7 +77,12 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// Append mode
if (OptionsResolver.isAppendMode(conf)) {
return Pipelines.append(conf, rowType, dataStream, context.isBounded());
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
if (OptionsResolver.needsAsyncClustering(conf)) {
return Pipelines.cluster(conf, rowType, pipeline);
} else {
return Pipelines.dummySink(pipeline);
}
}
// default parallelism
@@ -90,7 +94,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// write pipeline
pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
// compaction
if (StreamerUtil.needsAsyncCompaction(conf)) {
if (OptionsResolver.needsAsyncCompaction(conf)) {
return Pipelines.compact(conf, pipeline);
} else {
return Pipelines.clean(conf, pipeline);

View File

@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
/**
* Utilities for flink hudi clustering.
*/
public class ClusteringUtil {
private static final Logger LOG = LoggerFactory.getLogger(ClusteringUtil.class);
public static void validateClusteringScheduling(Configuration conf) {
if (OptionsResolver.isBucketIndexType(conf)) {
throw new UnsupportedOperationException("Clustering is not supported for bucket index.");
}
}
/**
* Schedules clustering plan by condition.
*
* @param conf The configuration
* @param writeClient The write client
* @param committed Whether the instant was committed
*/
public static void scheduleClustering(Configuration conf, HoodieFlinkWriteClient writeClient, boolean committed) {
validateClusteringScheduling(conf);
if (committed) {
writeClient.scheduleClustering(Option.empty());
}
}
/**
* Force rolls back all the inflight clustering instants, especially for job failover restart.
*
* @param table The hoodie table
* @param writeClient The write client
*/
public static void rollbackClustering(HoodieFlinkTable<?> table, HoodieFlinkWriteClient writeClient) {
List<HoodieInstant> inflightInstants = ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient())
.stream()
.filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
.collect(Collectors.toList());
inflightInstants.forEach(inflightInstant -> {
LOG.info("Rollback the inflight clustering instant: " + inflightInstant + " for failover");
table.rollbackInflightClustering(inflightInstant,
commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
});
}
}

View File

@@ -53,6 +53,7 @@ import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.avro.Schema;
@@ -168,6 +169,8 @@ public class StreamerUtil {
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
.withClusteringPlanPartitionFilterMode(
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
.withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS))
.withClusteringTargetFileMaxBytes(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES))
@@ -311,26 +314,6 @@ public class StreamerUtil {
return String.format("%s_%s", partitionPath, fileId);
}
/**
* Returns whether there is need to schedule the async compaction.
*
* @param conf The flink configuration.
*/
public static boolean needsAsyncCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf)
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
}
/**
* Returns whether there is need to schedule the compaction plan.
*
* @param conf The flink configuration.
*/
public static boolean needsScheduleCompaction(Configuration conf) {
return OptionsResolver.isMorTable(conf)
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}
/**
* Creates the meta client for reader.
*

View File

@@ -152,6 +152,17 @@ public class ITTestDataStreamWrite extends TestLogger {
testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED);
}
@Test
public void testWriteMergeOnReadWithClustering() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
}
private void testWriteToHoodie(
Transformer transformer,
String jobName,
@@ -250,6 +261,69 @@ public class ITTestDataStreamWrite extends TestLogger {
}
TestData.checkWrittenFullData(tempFile, expected);
}
private void testWriteToHoodieWithCluster(
Configuration conf,
String jobName,
int checkpoints,
Map<String, List<String>> expected) throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(4);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Read from file source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
);
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
DataStream<RowData> dataStream;
if (isMor) {
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
dataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(1);
} else {
dataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4);
}
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, true);
execEnv.addOperator(pipeline.getTransformation());
Pipelines.cluster(conf, rowType, pipeline);
JobClient client = execEnv.executeAsync(jobName);
// wait for the streaming job to finish
client.getJobExecutionResult().get();
TestData.checkWrittenFullData(tempFile, expected);
}
}

View File

@@ -286,6 +286,29 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
@Test
public void testInsertAsyncClustering() throws Exception {
// reset the config option
conf.setString(FlinkOptions.OPERATION, "insert");
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
prepareInsertPipeline(conf)
.consume(TestData.DATA_SET_INSERT_SAME_KEY)
.checkpoint(1)
.handleEvents(1)
.checkpointComplete(1)
.checkWrittenData(EXPECTED4, 1)
// insert duplicates again
.consume(TestData.DATA_SET_INSERT_SAME_KEY)
.checkpoint(2)
.handleEvents(1)
.checkpointComplete(2)
.checkWrittenFullData(EXPECTED5)
.end();
}
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
@@ -419,7 +442,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
// -------------------------------------------------------------------------
private TestHarness preparePipeline() throws Exception {
return TestHarness.instance().preparePipeline(tempFile, conf);
return preparePipeline(conf);
}
protected TestHarness preparePipeline(Configuration conf) throws Exception {
@@ -427,6 +450,10 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
}
protected TestHarness prepareInsertPipeline() throws Exception {
return prepareInsertPipeline(conf);
}
protected TestHarness prepareInsertPipeline(Configuration conf) throws Exception {
return TestHarness.instance().preparePipeline(tempFile, conf, true);
}

View File

@@ -115,7 +115,6 @@ public class ITTestHoodieFlinkClustering {
// set the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name());
// set record key field
conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
@@ -133,7 +132,7 @@ public class ITTestHoodieFlinkClustering {
// To compute the clustering instant time and do clustering.
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
@@ -160,7 +159,7 @@ public class ITTestHoodieFlinkClustering {
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
.name("clustering_source")
.uid("uid_clustering_source")
.rebalance()
@@ -181,4 +180,44 @@ public class ITTestHoodieFlinkClustering {
env.execute("flink_hudi_clustering");
TestData.checkWrittenData(tempFile, EXPECTED, 4);
}
@Test
public void testHoodieFlinkClusteringService() throws Exception {
// Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
// use append mode
options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);
// Make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
cfg.path = tempFile.getAbsolutePath();
cfg.minClusteringIntervalSeconds = 3;
cfg.schedule = true;
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf, env);
asyncClusteringService.start(null);
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(5);
asyncClusteringService.shutDown();
TestData.checkWrittenData(tempFile, EXPECTED, 4);
}
}

View File

@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
/**
* A wrapper class to manipulate the {@link ClusteringOperator} instance for testing.
*/
public class ClusteringFunctionWrapper {
private final Configuration conf;
private final IOManager ioManager;
private final StreamingRuntimeContext runtimeContext;
private final StreamTask<?, ?> streamTask;
private final StreamConfig streamConfig;
/**
* Function that generates the {@code HoodieClusteringPlan}.
*/
private ClusteringPlanOperator clusteringPlanOperator;
/**
* Output to collect the clustering commit events.
*/
private CollectorOutput<ClusteringCommitEvent> commitEventOutput;
/**
* Function that executes the clustering task.
*/
private ClusteringOperator clusteringOperator;
/**
* Stream sink to handle clustering commits.
*/
private ClusteringCommitSink commitSink;
public ClusteringFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask, StreamConfig streamConfig) {
this.ioManager = new IOManagerAsync();
MockEnvironment environment = new MockEnvironmentBuilder()
.setTaskName("mockTask")
.setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
.setIOManager(ioManager)
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
this.conf = conf;
this.streamTask = streamTask;
this.streamConfig = streamConfig;
}
public void openFunction() throws Exception {
clusteringPlanOperator = new ClusteringPlanOperator(conf);
clusteringPlanOperator.open();
clusteringOperator = new ClusteringOperator(conf, TestConfigurations.ROW_TYPE);
// CAUTION: deprecated API used.
clusteringOperator.setProcessingTimeService(new TestProcessingTimeService());
commitEventOutput = new CollectorOutput<>();
clusteringOperator.setup(streamTask, streamConfig, commitEventOutput);
clusteringOperator.open();
final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
new MockOperatorCoordinatorContext(new OperatorID(), 1));
clusteringOperator.setExecutor(syncExecutor);
commitSink = new ClusteringCommitSink(conf);
commitSink.setRuntimeContext(runtimeContext);
commitSink.open(conf);
}
public void cluster(long checkpointID) throws Exception {
// collect the ClusteringPlanEvents.
CollectorOutput<ClusteringPlanEvent> planOutput = new CollectorOutput<>();
clusteringPlanOperator.setOutput(planOutput);
clusteringPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the ClusteringCommitEvents
for (ClusteringPlanEvent event : planOutput.getRecords()) {
clusteringOperator.processElement(new StreamRecord<>(event));
}
// handle and commit the clustering
for (ClusteringCommitEvent event : commitEventOutput.getRecords()) {
commitSink.invoke(event, null);
}
}
public void close() throws Exception {
ioManager.close();
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.sink.utils;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.append.AppendWriteFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
@@ -25,6 +27,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -34,9 +37,12 @@ import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCo
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -57,12 +63,15 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
private final StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
private final boolean asyncClustering;
private ClusteringFunctionWrapper clusteringFunctionWrapper;
/**
* Append write function.
*/
private AppendWriteFunction<RowData> writeFunction;
public InsertFunctionWrapper(String tablePath, Configuration conf) {
public InsertFunctionWrapper(String tablePath, Configuration conf) throws Exception {
IOManager ioManager = new IOManagerAsync();
MockEnvironment environment = new MockEnvironmentBuilder()
.setTaskName("mockTask")
@@ -77,6 +86,15 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.stateInitializationContext = new MockStateInitializationContext();
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
StreamConfig streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
StreamTask<?, ?> streamTask = new MockStreamTaskBuilder(environment)
.setConfig(new StreamConfig(conf))
.setExecutionConfig(new ExecutionConfig().enableObjectReuse())
.build();
this.clusteringFunctionWrapper = new ClusteringFunctionWrapper(this.conf, streamTask, streamConfig);
}
public void openFunction() throws Exception {
@@ -84,6 +102,10 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
setupWriteFunction();
if (asyncClustering) {
clusteringFunctionWrapper.openFunction();
}
}
public void invoke(I record) throws Exception {
@@ -109,6 +131,13 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
public void checkpointComplete(long checkpointId) {
stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
coordinator.notifyCheckpointComplete(checkpointId);
if (asyncClustering) {
try {
clusteringFunctionWrapper.cluster(checkpointId);
} catch (Exception e) {
throw new HoodieException(e);
}
}
}
public StreamWriteOperatorCoordinator getCoordinator() {
@@ -118,6 +147,9 @@ public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
@Override
public void close() throws Exception {
this.coordinator.close();
if (clusteringFunctionWrapper != null) {
clusteringFunctionWrapper.close();
}
}
public BulkInsertWriterHelper getWriterHelper() {

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
@@ -28,7 +29,6 @@ import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.api.common.ExecutionConfig;
@@ -120,8 +120,7 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
this.streamTask = new MockStreamTaskBuilder(environment)

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
@@ -337,7 +338,9 @@ public class TestWriteBase {
public TestHarness checkWrittenData(
Map<String, String> expected,
int partitions) throws Exception {
if (OptionsResolver.isCowTable(conf) || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
if (OptionsResolver.isCowTable(conf)
|| conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
|| OptionsResolver.isAppendMode(conf)) {
TestData.checkWrittenData(this.baseFile, expected, partitions);
} else {
checkWrittenDataMor(baseFile, expected, partitions);
@@ -419,7 +422,7 @@ public class TestWriteBase {
protected String lastCompleteInstant() {
return OptionsResolver.isMorTable(conf)
? TestUtils.getLastDeltaCompleteInstant(basePath)
: TestUtils.getLastCompleteInstant(basePath);
: TestUtils.getLastCompleteInstant(basePath, HoodieTimeline.COMMIT_ACTION);
}
}
}

View File

@@ -50,6 +50,16 @@ public class TestUtils {
return StreamerUtil.getLastCompletedInstant(metaClient);
}
public static String getLastCompleteInstant(String basePath, String commitAction) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build();
return metaClient.getCommitsTimeline().filterCompletedInstants()
.filter(instant -> commitAction.equals(instant.getAction()))
.lastInstant()
.map(HoodieInstant::getTimestamp)
.orElse(null);
}
public static String getLastDeltaCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build();