From aa6342c3c9af53e7064546becf987e26534d0764 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 18 Jun 2021 09:35:09 +0800 Subject: [PATCH] [HUDI-2036] Move the compaction plan scheduling out of flink writer coordinator (#3101) Since HUDI-1955 was fixed, we can move the scheduling out if the coordinator to make the coordinator more lightweight. --- ...FlinkScheduleCompactionActionExecutor.java | 40 ------- .../sink/StreamWriteOperatorCoordinator.java | 10 -- .../hudi/sink/compact/CompactFunction.java | 28 +++-- .../sink/compact/CompactionCommitSink.java | 3 +- .../sink/compact/CompactionPlanOperator.java | 44 ++++---- .../compact/CompactionPlanSourceFunction.java | 3 +- .../sink/compact/FlinkCompactionConfig.java | 2 + .../sink/compact/HoodieFlinkCompactor.java | 36 ++----- .../sink/compact/NonKeyedCompactFunction.java | 102 ------------------ .../hudi/streamer/HoodieFlinkStreamer.java | 6 +- .../apache/hudi/table/HoodieTableSink.java | 7 +- .../org/apache/hudi/util/CompactionUtil.java | 51 +++++---- .../org/apache/hudi/util/StreamerUtil.java | 14 +-- .../apache/hudi/sink/StreamWriteITCase.java | 76 ++++++------- .../hudi/sink/TestWriteMergeOnRead.java | 7 ++ .../utils/StreamWriteFunctionWrapper.java | 18 +++- 16 files changed, 156 insertions(+), 291 deletions(-) delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java index d18cac23f..4143944bb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java @@ -28,14 +28,11 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; @@ -156,41 +153,4 @@ public class FlinkScheduleCompactionActionExecutor execute() { - if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() - && !config.getFailedWritesCleanPolicy().isLazy()) { - // if there are inflight writes, their instantTime must not be less than that of compaction instant time - table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() - .ifPresent(earliestInflight -> ValidationUtils.checkArgument( - HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime), - "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight - + ", Compaction scheduled at " + instantTime)); - // Committed and pending compaction instants should have strictly lower timestamps - List conflictingInstants = table.getActiveTimeline() - .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() - .filter(instant -> HoodieTimeline.compareTimestamps( - instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)) - .collect(Collectors.toList()); - ValidationUtils.checkArgument(conflictingInstants.isEmpty(), - "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" - + conflictingInstants); - } - - HoodieCompactionPlan plan = scheduleCompaction(); - if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) { - extraMetadata.ifPresent(plan::setExtraMetadata); - HoodieInstant compactionInstant = - new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); - try { - table.getActiveTimeline().saveToCompactionRequested(compactionInstant, - TimelineMetadataUtils.serializeCompactionPlan(plan)); - } catch (IOException ioe) { - throw new HoodieIOException("Exception scheduling compaction", ioe); - } - return Option.of(plan); - } - return Option.empty(); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index da771e035..faa5264d9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -104,11 +104,6 @@ public class StreamWriteOperatorCoordinator */ private final int parallelism; - /** - * Whether needs to schedule compaction task on finished checkpoints. - */ - private final boolean needsScheduleCompaction; - /** * A single-thread executor to handle all the asynchronous jobs of the coordinator. */ @@ -141,7 +136,6 @@ public class StreamWriteOperatorCoordinator this.conf = conf; this.context = context; this.parallelism = context.currentParallelism(); - this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); } @Override @@ -203,10 +197,6 @@ public class StreamWriteOperatorCoordinator // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) final boolean committed = commitInstant(); if (committed) { - // if async compaction is on, schedule the compaction - if (needsScheduleCompaction) { - writeClient.scheduleCompaction(Option.empty()); - } // start new instant. startInstant(); // sync Hive if is enabled diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 13a9f45fd..4598583cc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -28,7 +28,7 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ import java.util.List; * Function to execute the actual compaction task assigned by the compaction plan task. * In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}. */ -public class CompactFunction extends KeyedProcessFunction { +public class CompactFunction extends ProcessFunction { private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class); /** @@ -53,6 +53,11 @@ public class CompactFunction extends KeyedProcessFunction collector) throws Exception { final String instantTime = event.getCompactionInstantTime(); final CompactionOperation compactionOperation = event.getOperation(); - // executes the compaction task asynchronously to not block the checkpoint barrier propagate. - executor.execute( - () -> doCompaction(instantTime, compactionOperation, collector), "Execute compaction for instant %s from task %d", instantTime, taskID - ); + if (asyncCompaction) { + // executes the compaction task asynchronously to not block the checkpoint barrier propagate. + executor.execute( + () -> doCompaction(instantTime, compactionOperation, collector), + "Execute compaction for instant %s from task %d", instantTime, taskID); + } else { + // executes the compaction task synchronously for batch mode. + LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID); + doCompaction(instantTime, compactionOperation, collector); + } } private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector collector) throws IOException { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 0884342de..e6c4cedaa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -122,8 +122,7 @@ public class CompactionCommitSink extends CleanFunction { // Whether to cleanup the old log file when compaction if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { - this.writeClient.startAsyncCleaning(); - this.writeClient.waitForCleaningFinish(); + this.writeClient.clean(); } // reset the status diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index e48f4ed55..af9bae766 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -26,8 +26,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; @@ -84,21 +83,34 @@ public class CompactionPlanOperator extends AbstractStreamOperator table = writeClient.getHoodieTable(); - // the last instant takes the highest priority. - Option compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant(); - String compactionInstantTime = compactionInstant.isPresent() ? compactionInstant.get().getTimestamp() : null; - if (compactionInstantTime == null) { + try { + scheduleCompaction(checkpointId); + } catch (Throwable throwable) { + // make it fail safe + LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable); + } + } + + private void scheduleCompaction(long checkpointId) throws IOException { + // if async compaction is on, schedule the compaction + HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); + final String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); + + boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + if (!scheduled) { // do nothing. LOG.info("No compaction plan for checkpoint " + checkpointId); return; } + if (this.compactionInstantTime != null && Objects.equals(this.compactionInstantTime, compactionInstantTime)) { // do nothing LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore"); return; } + + HoodieFlinkTable table = writeClient.getHoodieTable(); // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( @@ -121,7 +133,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator> output) { this.output = output; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java index 5d5a008ab..bb0c7cb0f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java @@ -46,7 +46,8 @@ import static java.util.stream.Collectors.toList; * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()} * as the instant time; *
  • If the timeline has inflight instants, - * use the {earliest inflight instant time - 1ms} as the instant time.
  • + * use the median instant time between [last complete instant time, earliest inflight instant time] + * as the instant time. * */ public class CompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 8e0c671f5..ddc73a594 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -101,6 +101,8 @@ public class FlinkCompactionConfig extends Configuration { conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds); conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); + // use synchronous compaction always + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); return conf; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 57b5c06ae..0ba2351b9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; @@ -36,12 +35,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.ProcessOperator; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** * Flink hudi compaction program that can be executed manually. */ @@ -72,19 +68,15 @@ public class HoodieFlinkCompactor { // judge whether have operation // to compute the compaction instant time and do compaction. - String instantTime = CompactionUtil.getCompactionInstantTime(metaClient); + String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); - writeClient.scheduleCompactionAtInstant(instantTime, Option.empty()); - - HoodieFlinkTable table = writeClient.getHoodieTable(); - // the last instant takes the highest priority. - Option compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant(); - String compactionInstantTime = compactionInstant.isPresent() ? compactionInstant.get().getTimestamp() : null; - if (compactionInstantTime == null) { + boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + if (!scheduled) { // do nothing. LOG.info("No compaction plan for this job "); return; } + HoodieFlinkTable table = writeClient.getHoodieTable(); // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( @@ -108,7 +100,7 @@ public class HoodieFlinkCompactor { LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + "Clean the compaction plan in auxiliary path and cancels the compaction"); - cleanInstant(table.getMetaClient(), instant); + CompactionUtil.cleanInstant(table.getMetaClient(), instant); return; } @@ -118,7 +110,7 @@ public class HoodieFlinkCompactor { .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new NonKeyedCompactFunction(conf))) + new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(compactionPlan.getOperations().size()) .addSink(new CompactionCommitSink(conf)) .name("clean_commits") @@ -127,20 +119,4 @@ public class HoodieFlinkCompactor { env.execute("flink_hudi_compaction"); } - - private static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { - Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); - try { - if (metaClient.getFs().exists(commitFilePath)) { - boolean deleted = metaClient.getFs().delete(commitFilePath, false); - if (deleted) { - LOG.info("Removed instant " + instant); - } else { - throw new HoodieIOException("Could not delete instant " + instant); - } - } - } catch (IOException e) { - throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e); - } - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java deleted file mode 100644 index f1be78cee..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.compact; - -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.CompactionOperation; -import org.apache.hudi.sink.utils.NonThrownExecutor; -import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; -import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; -import org.apache.hudi.util.StreamerUtil; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; - -/** - * Function to execute the actual compaction task assigned by the compaction plan task. - * The input compact event {@link CompactionPlanEvent}s were distributed evenly to this function. - */ -public class NonKeyedCompactFunction extends ProcessFunction { - private static final Logger LOG = LoggerFactory.getLogger(NonKeyedCompactFunction.class); - - /** - * Config options. - */ - private final Configuration conf; - - /** - * Write Client. - */ - private transient HoodieFlinkWriteClient writeClient; - - /** - * Id of current subtask. - */ - private int taskID; - - /** - * Executor service to execute the compaction task. - */ - private transient NonThrownExecutor executor; - - public NonKeyedCompactFunction(Configuration conf) { - this.conf = conf; - } - - @Override - public void open(Configuration parameters) throws Exception { - this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); - this.executor = new NonThrownExecutor(LOG); - } - - @Override - public void processElement(CompactionPlanEvent event, Context ctx, Collector collector) throws Exception { - final String instantTime = event.getCompactionInstantTime(); - final CompactionOperation compactionOperation = event.getOperation(); - doCompaction(instantTime, compactionOperation, collector); - } - - private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector collector) throws IOException { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); - List writeStatuses = compactor.compact( - new HoodieFlinkCopyOnWriteTable<>( - this.writeClient.getConfig(), - this.writeClient.getEngineContext(), - this.writeClient.getHoodieTable().getMetaClient()), - this.writeClient.getHoodieTable().getMetaClient(), - this.writeClient.getConfig(), - compactionOperation, - instantTime); - collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); - } - - @VisibleForTesting - public void setExecutor(NonThrownExecutor executor) { - this.executor = executor; - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index eaca4c94b..bee5fe641 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -117,16 +117,16 @@ public class HoodieFlinkStreamer { .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write") .setParallelism(numWriteTask); - if (StreamerUtil.needsScheduleCompaction(conf)) { + if (StreamerUtil.needsAsyncCompaction(conf)) { pipeline.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) .uid("uid_compact_plan_generate") .setParallelism(1) // plan generate must be singleton - .keyBy(event -> event.getOperation().hashCode()) + .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new KeyedProcessOperator<>(new CompactFunction(conf))) + new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) .addSink(new CompactionCommitSink(conf)) .name("compact_commit") diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index c478893d3..ae25644d1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -38,7 +38,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -99,16 +98,16 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .name("uid_hoodie_stream_write") .setParallelism(numWriteTasks); - if (StreamerUtil.needsScheduleCompaction(conf)) { + if (StreamerUtil.needsAsyncCompaction(conf)) { return pipeline.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) .name("uid_compact_plan_generate") .setParallelism(1) // plan generate must be singleton - .keyBy(event -> event.getOperation().hashCode()) + .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new KeyedProcessOperator<>(new CompactFunction(conf))) + new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) .addSink(new CompactionCommitSink(conf)) .name("compact_commit") diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 5ffd58d49..46d727262 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -22,18 +22,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.stream.Collectors; +import java.io.IOException; /** * Utilities for flink hudi compaction. @@ -44,29 +43,24 @@ public class CompactionUtil { /** * Creates the metaClient. - * */ + */ public static HoodieTableMetaClient createMetaClient(Configuration conf) { return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build(); } /** * Gets compaction Instant time. - * */ + */ public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) { - Option hoodieInstantOption = metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant(); - if (hoodieInstantOption.isPresent()) { - HoodieInstant firstInstant = hoodieInstantOption.get(); - String newCommitTime = StreamerUtil.instantTimeSubtract(firstInstant.getTimestamp(), 1); + Option firstPendingInstant = metaClient.getCommitsTimeline() + .filterPendingExcludingCompaction().firstInstant(); + Option lastCompleteInstant = metaClient.getActiveTimeline().getWriteTimeline() + .filterCompletedAndCompactionInstants().lastInstant(); + if (firstPendingInstant.isPresent() && lastCompleteInstant.isPresent()) { + String firstPendingTimestamp = firstPendingInstant.get().getTimestamp(); + String lastCompleteTimestamp = lastCompleteInstant.get().getTimestamp(); // Committed and pending compaction instants should have strictly lower timestamps - List conflictingInstants = metaClient.getActiveTimeline() - .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() - .filter(instant -> HoodieTimeline.compareTimestamps( - instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, newCommitTime)) - .collect(Collectors.toList()); - ValidationUtils.checkArgument(conflictingInstants.isEmpty(), - "Following instants have timestamps >= compactionInstant (" + newCommitTime + ") Instants :" - + conflictingInstants); - return newCommitTime; + return StreamerUtil.medianInstantTime(firstPendingTimestamp, lastCompleteTimestamp); } else { return HoodieActiveTimeline.createNewInstantTime(); } @@ -83,4 +77,23 @@ public class CompactionUtil { Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false); conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString()); } + + /** + * Cleans the metadata file for given instant {@code instant}. + */ + public static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { + Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); + try { + if (metaClient.getFs().exists(commitFilePath)) { + boolean deleted = metaClient.getFs().delete(commitFilePath, false); + if (deleted) { + LOG.info("Removed instant " + instant); + } else { + throw new HoodieIOException("Could not delete instant " + instant); + } + } + } catch (IOException e) { + throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e); + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index caecba4dc..fcbdb21b5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -276,7 +276,7 @@ public class StreamerUtil { * Returns whether needs to schedule the async compaction. * @param conf The flink configuration. */ - public static boolean needsScheduleCompaction(Configuration conf) { + public static boolean needsAsyncCompaction(Configuration conf) { return conf.getString(FlinkOptions.TABLE_TYPE) .toUpperCase(Locale.ROOT) .equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ) @@ -304,10 +304,12 @@ public class StreamerUtil { } /** - * Subtract the old instant time with given milliseconds and returns. - * */ - public static String instantTimeSubtract(String oldInstant, long milliseconds) { - long oldTime = Long.parseLong(oldInstant); - return String.valueOf(oldTime - milliseconds); + * Return the median instant time between the given two instant time. + */ + public static String medianInstantTime(String highVal, String lowVal) { + long high = Long.parseLong(highVal); + long low = Long.parseLong(lowVal); + long median = low + (high - low) / 2; + return String.valueOf(median); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index b001c5406..8e67159a4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -36,7 +36,6 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.compact.CompactionPlanSourceFunction; import org.apache.hudi.sink.compact.FlinkCompactionConfig; -import org.apache.hudi.sink.compact.NonKeyedCompactFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; @@ -62,11 +61,9 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -143,8 +140,8 @@ public class StreamWriteITCase extends TestLogger { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { hoodieDataStream = hoodieDataStream.transform("index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))); } DataStream pipeline = hoodieDataStream @@ -174,24 +171,26 @@ public class StreamWriteITCase extends TestLogger { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); tableEnv.getConfig().getConfiguration() - .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 values\n" - + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" - + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" - + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" - + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" - + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" - + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" - + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" - + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; - TableResult tableResult = tableEnv.executeSql(insertInto); - TimeUnit.SECONDS.sleep(5); - tableResult.await(); + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" + + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" + + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" + + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; + tableEnv.executeSql(insertInto).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); // Make configuration and setAvroSchema. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -199,7 +198,6 @@ public class StreamWriteITCase extends TestLogger { cfg.path = tempFile.getAbsolutePath(); Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); - conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition"); // create metaClient HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); @@ -212,34 +210,30 @@ public class StreamWriteITCase extends TestLogger { // judge whether have operation // To compute the compaction instant time and do compaction. - String instantTime = CompactionUtil.getCompactionInstantTime(metaClient); + String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); - writeClient.scheduleCompactionAtInstant(instantTime, Option.empty()); + writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); HoodieFlinkTable table = writeClient.getHoodieTable(); - // the last instant takes the highest priority. - Option compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant(); - String compactionInstantTime = compactionInstant.get().getTimestamp(); - // generate compaction plan // should support configurable commit metadata HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - table.getMetaClient(), compactionInstantTime); + table.getMetaClient(), compactionInstantTime); HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) - .name("compaction_source") - .uid("uid_compaction_source") - .rebalance() - .transform("compact_task", - TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new NonKeyedCompactFunction(conf))) - .setParallelism(compactionPlan.getOperations().size()) - .addSink(new CompactionCommitSink(conf)) - .name("clean_commits") - .uid("uid_clean_commits") - .setParallelism(1); + .name("compaction_source") + .uid("uid_compaction_source") + .rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new CompactFunction(conf))) + .setParallelism(compactionPlan.getOperations().size()) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits") + .uid("uid_clean_commits") + .setParallelism(1); env.execute("flink_hudi_compaction"); TestData.checkWrittenFullData(tempFile, EXPECTED); @@ -284,12 +278,12 @@ public class StreamWriteITCase extends TestLogger { .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(4) - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { hoodieDataStream = hoodieDataStream.transform("index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))); } DataStream pipeline = hoodieDataStream @@ -314,10 +308,10 @@ public class StreamWriteITCase extends TestLogger { new CompactionPlanOperator(conf)) .uid("uid_compact_plan_generate") .setParallelism(1) // plan generate must be singleton - .keyBy(event -> event.getOperation().hashCode()) + .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new KeyedProcessOperator<>(new CompactFunction(conf))) + new ProcessOperator<>(new CompactFunction(conf))) .addSink(new CompactionCommitSink(conf)) .name("compact_commit") .setParallelism(1); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index b24881a53..17c13a60b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -28,11 +28,13 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestData; import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -62,6 +64,11 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { new FlinkTaskContextSupplier(null)); } + @Override + protected void setUp(Configuration conf) { + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + } + @Override protected void checkWrittenData(File baseFile, Map expected, int partitions) throws Exception { HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 9ba8b8efe..e0a0f4b6e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -31,6 +31,7 @@ import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; +import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; @@ -53,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing. @@ -82,6 +84,8 @@ public class StreamWriteFunctionWrapper { private CompactFunctionWrapper compactFunctionWrapper; + private final boolean asyncCompaction; + public StreamWriteFunctionWrapper(String tablePath) throws Exception { this(tablePath, TestConfigurations.getDefaultConf(tablePath)); } @@ -103,6 +107,7 @@ public class StreamWriteFunctionWrapper { this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); this.functionInitializationContext = new MockFunctionInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); + this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf); this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); } @@ -131,7 +136,7 @@ public class StreamWriteFunctionWrapper { writeFunction.setOperatorEventGateway(gateway); writeFunction.open(conf); - if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { + if (asyncCompaction) { compactFunctionWrapper.openFunction(); } } @@ -208,10 +213,19 @@ public class StreamWriteFunctionWrapper { public void checkpointComplete(long checkpointId) { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); + if (asyncCompaction) { + // sleep for a while to give a change for scheduling compaction, + // see HoodieActiveTimeline#createNewInstantTime for details. + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new HoodieException("Waiting for checkpoint success exception", e); + } + } coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); this.writeFunction.notifyCheckpointComplete(checkpointId); - if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { + if (asyncCompaction) { try { compactFunctionWrapper.compact(checkpointId); } catch (Exception e) {