1
0

[HUDI-1984] Support independent flink hudi compaction function (#3046)

This commit is contained in:
swuferhong
2021-06-13 15:04:46 +08:00
committed by GitHub
parent ba728d822f
commit 0c4f2fdc15
14 changed files with 772 additions and 63 deletions

View File

@@ -28,12 +28,16 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -50,12 +54,15 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload
private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class);
private final Option<Map<String, String>> extraMetadata;
public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
this.extraMetadata = extraMetadata;
}
@Override
@@ -149,4 +156,41 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload
}
return timestamp;
}
@Override
public Option<HoodieCompactionPlan> execute() {
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
}
HoodieCompactionPlan plan = scheduleCompaction();
if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
HoodieInstant compactionInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
try {
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
TimelineMetadataUtils.serializeCompactionPlan(plan));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling compaction", ioe);
}
return Option.of(plan);
}
return Option.empty();
}
}

View File

@@ -25,8 +25,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -520,39 +518,6 @@ public class FlinkOptions {
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";
/**
* Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
*/
@SuppressWarnings("unchecked, rawtypes")
public static org.apache.flink.configuration.Configuration fromStreamerConfig(FlinkStreamerConfig config) {
Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config));
org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
conf.setString(FlinkOptions.PATH, config.targetBasePath);
conf.setString(READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath);
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
// keygenClass has higher priority than keygenType
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
return conf;
}
/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/

View File

@@ -46,7 +46,7 @@ public class CleanFunction<T> extends AbstractRichFunction
private final Configuration conf;
private HoodieFlinkWriteClient writeClient;
protected HoodieFlinkWriteClient writeClient;
private NonThrownExecutor executor;
private volatile boolean isCleaning;

View File

@@ -33,6 +33,7 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
@@ -79,22 +80,24 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv
final CompactionOperation compactionOperation = event.getOperation();
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}, "Execute compaction for instant %s from task %d", instantTime, taskID
() -> doCompaction(instantTime, compactionOperation, collector), "Execute compaction for instant %s from task %d", instantTime, taskID
);
}
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}
@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;

View File

@@ -19,12 +19,12 @@
package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.util.StreamerUtil;
@@ -59,11 +59,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
*/
private final Configuration conf;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
* Buffer to collect the event from each compact task {@code CompactFunction}.
* The key is the instant time.
@@ -78,7 +73,9 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
if (writeClient == null) {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
}
this.commitBuffer = new HashMap<>();
}
@@ -122,6 +119,13 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
}
// commit the compaction
this.writeClient.commitCompaction(instant, statuses, Option.empty());
// Whether to cleanup the old log file when compaction
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
this.writeClient.startAsyncCleaning();
this.writeClient.waitForCleaningFinish();
}
// reset the status
reset(instant);
}

View File

@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* Flink hudi compaction source function.
*
* <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
* event {@link CompactionPlanEvent} to downstream operators.
*
* <p>The compaction instant time is specified explicitly with strategies:
*
* <ul>
* <li>If the timeline has no inflight instants,
* use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
* as the instant time;</li>
* <li>If the timeline has inflight instants,
* use the {earliest inflight instant time - 1ms} as the instant time.</li>
* </ul>
*/
public class CompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class);
/**
* Compaction instant time.
*/
private String compactionInstantTime;
/**
* Hoodie flink table.
*/
private HoodieFlinkTable<?> table;
/**
* The compaction plan.
*/
private HoodieCompactionPlan compactionPlan;
/**
* Hoodie instant.
*/
private HoodieInstant instant;
public CompactionPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
this.table = table;
this.instant = instant;
this.compactionPlan = compactionPlan;
this.compactionInstantTime = compactionInstantTime;
}
@Override
public void open(Configuration parameters) throws Exception {
// no operation
}
@Override
public void run(SourceContext sourceContext) throws Exception {
// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
List<CompactionOperation> operations = this.compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, operation));
}
}
@Override
public void close() throws Exception {
// no operation
}
@Override
public void cancel() {
// no operation
}
}

View File

@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact;
import org.apache.hudi.configuration.FlinkOptions;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
/**
* Configurations for Hoodie Flink compaction.
*/
public class FlinkCompactionConfig extends Configuration {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
// ------------------------------------------------------------------------
// Hudi Write Options
// ------------------------------------------------------------------------
@Parameter(names = {"--path"}, description = "Base path for the target hoodie table.", required = true)
public String path;
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
public static final String NUM_AND_TIME = "num_and_time";
public static final String NUM_OR_TIME = "num_or_time";
@Parameter(names = {"--compaction-trigger-strategy"},
description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+ "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+ "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+ "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+ "Default is 'num_commits'",
required = false)
public String compactionTriggerStrategy = NUM_COMMITS;
@Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits", required = false)
public Integer compactionDeltaCommits = 1;
@Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour", required = false)
public Integer compactionDeltaSeconds = 3600;
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false)
public Boolean cleanAsyncEnable = false;
@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10",
required = false)
public Integer cleanRetainCommits = 10;
@Parameter(names = {"--archive-min-commits"},
description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.",
required = false)
public Integer archiveMinCommits = 20;
@Parameter(names = {"--archive-max-commits"},
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.",
required = false)
public Integer archiveMaxCommits = 30;
@Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
public Integer compactionMaxMemory = 100;
/**
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
* */
public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkCompactionConfig config) {
org.apache.flink.configuration.Configuration conf = new Configuration();
conf.setString(FlinkOptions.PATH, config.path);
conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, config.compactionTriggerStrategy);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, config.compactionDeltaCommits);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
return conf;
}
}

View File

@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Flink hudi compaction program that can be executed manually.
*/
public class HoodieFlinkCompactor {
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
// create metaClient
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
// get the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
// judge whether have operation
// to compute the compaction instant time and do compaction.
String instantTime = CompactionUtil.getCompactionInstantTime(metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// the last instant takes the highest priority.
Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
String compactionInstantTime = compactionInstant.isPresent() ? compactionInstant.get().getTimestamp() : null;
if (compactionInstantTime == null) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
}
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// No compaction plan, do nothing and return.
LOG.info("No compaction plan for this job and instant " + compactionInstantTime);
return;
}
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.
// clean the compaction plan in auxiliary path and cancels the compaction.
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
cleanInstant(table.getMetaClient(), instant);
return;
}
env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new NonKeyedCompactFunction(conf)))
.setParallelism(compactionPlan.getOperations().size())
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
.setParallelism(1);
env.execute("flink_hudi_compaction");
}
private static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
try {
if (metaClient.getFs().exists(commitFilePath)) {
boolean deleted = metaClient.getFs().delete(commitFilePath, false);
if (deleted) {
LOG.info("Removed instant " + instant);
} else {
throw new HoodieIOException("Could not delete instant " + instant);
}
}
} catch (IOException e) {
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
}
}
}

View File

@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* Function to execute the actual compaction task assigned by the compaction plan task.
* The input compact event {@link CompactionPlanEvent}s were distributed evenly to this function.
*/
public class NonKeyedCompactFunction extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(NonKeyedCompactFunction.class);
/**
* Config options.
*/
private final Configuration conf;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
* Id of current subtask.
*/
private int taskID;
/**
* Executor service to execute the compaction task.
*/
private transient NonThrownExecutor executor;
public NonKeyedCompactFunction(Configuration conf) {
this.conf = conf;
}
@Override
public void open(Configuration parameters) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.executor = new NonThrownExecutor(LOG);
}
@Override
public void processElement(CompactionPlanEvent event, Context ctx, Collector<CompactionCommitEvent> collector) throws Exception {
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
doCompaction(instantTime, compactionOperation, collector);
}
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}
@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
}
}

View File

@@ -21,13 +21,17 @@ package org.apache.hudi.streamer;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Configurations for Hoodie Flink streamer.
@@ -124,4 +128,35 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
public Integer writeTaskNum = 4;
/**
* Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties
* in the properties file (set by `--props` option) and cmd line options
* (set by `--hoodie-conf` option).
*/
@SuppressWarnings("unchecked, rawtypes")
public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkStreamerConfig config) {
Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config));
org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
conf.setString(FlinkOptions.PATH, config.targetBasePath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath);
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
return conf;
}
}

View File

@@ -23,11 +23,11 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -81,7 +81,7 @@ public class HoodieFlinkStreamer {
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);

View File

@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
/**
* Utilities for flink hudi compaction.
*/
public class CompactionUtil {
private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class);
/**
* Creates the metaClient.
* */
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
}
/**
* Gets compaction Instant time.
* */
public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) {
Option<HoodieInstant> hoodieInstantOption = metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant();
if (hoodieInstantOption.isPresent()) {
HoodieInstant firstInstant = hoodieInstantOption.get();
String newCommitTime = StreamerUtil.instantTimeSubtract(firstInstant.getTimestamp(), 1);
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = metaClient.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, newCommitTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + newCommitTime + ") Instants :"
+ conflictingInstants);
return newCommitTime;
} else {
return HoodieActiveTimeline.createNewInstantTime();
}
}
/**
* Sets up the avro schema string into the give configuration {@code conf}
* through reading from the hoodie table metadata.
*
* @param conf The configuration
*/
public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString());
}
}

View File

@@ -90,7 +90,7 @@ public class StreamerUtil {
}
public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
return new FilebasedSchemaProvider(FlinkOptions.fromStreamerConfig(cfg)).getSourceSchema();
return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema();
}
public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
@@ -150,7 +150,7 @@ public class StreamerUtil {
}
public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) {
return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
return getHoodieClientConfig(FlinkStreamerConfig.toFlinkConfig(conf));
}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
@@ -302,4 +302,12 @@ public class StreamerUtil {
long oldTime = Long.parseLong(oldInstant);
return String.valueOf(oldTime + milliseconds);
}
/**
* Subtract the old instant time with given milliseconds and returns.
* */
public static String instantTimeSubtract(String oldInstant, long milliseconds) {
long oldTime = Long.parseLong(oldInstant);
return String.valueOf(oldTime - milliseconds);
}
}

View File

@@ -18,8 +18,15 @@
package org.apache.hudi.sink;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.compact.CompactFunction;
@@ -27,10 +34,15 @@ import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.compact.NonKeyedCompactFunction;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -50,13 +62,20 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -72,6 +91,8 @@ import java.util.concurrent.TimeUnit;
*/
public class StreamWriteITCase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(StreamWriteITCase.class);
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
static {
@@ -147,6 +168,83 @@ public class StreamWriteITCase extends TestLogger {
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
@Test
public void testHoodieFlinkCompactor() throws Exception {
// Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+ "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+ "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+ "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
TableResult tableResult = tableEnv.executeSql(insertInto);
TimeUnit.SECONDS.sleep(5);
tableResult.await();
// Make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition");
// create metaClient
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
// set the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
// judge whether have operation
// To compute the compaction instant time and do compaction.
String instantTime = CompactionUtil.getCompactionInstantTime(metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
// the last instant takes the highest priority.
Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
String compactionInstantTime = compactionInstant.get().getTimestamp();
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new NonKeyedCompactFunction(conf)))
.setParallelism(compactionPlan.getOperations().size())
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
.setParallelism(1);
env.execute("flink_hudi_compaction");
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
@Test
public void testMergeOnReadWriteWithCompaction() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());