[HUDI-2036] Move the compaction plan scheduling out of flink writer coordinator (#3101)
Since HUDI-1955 was fixed, we can move the scheduling out if the coordinator to make the coordinator more lightweight.
This commit is contained in:
@@ -28,14 +28,11 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
|
||||||
import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieCompactionException;
|
import org.apache.hudi.exception.HoodieCompactionException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
@@ -156,41 +153,4 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload
|
|||||||
}
|
}
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Option<HoodieCompactionPlan> execute() {
|
|
||||||
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
|
|
||||||
&& !config.getFailedWritesCleanPolicy().isLazy()) {
|
|
||||||
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
|
|
||||||
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
|
|
||||||
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
|
|
||||||
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
|
|
||||||
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
|
|
||||||
+ ", Compaction scheduled at " + instantTime));
|
|
||||||
// Committed and pending compaction instants should have strictly lower timestamps
|
|
||||||
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
|
|
||||||
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
|
|
||||||
.filter(instant -> HoodieTimeline.compareTimestamps(
|
|
||||||
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
|
|
||||||
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
|
|
||||||
+ conflictingInstants);
|
|
||||||
}
|
|
||||||
|
|
||||||
HoodieCompactionPlan plan = scheduleCompaction();
|
|
||||||
if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) {
|
|
||||||
extraMetadata.ifPresent(plan::setExtraMetadata);
|
|
||||||
HoodieInstant compactionInstant =
|
|
||||||
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
|
|
||||||
try {
|
|
||||||
table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
|
|
||||||
TimelineMetadataUtils.serializeCompactionPlan(plan));
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
throw new HoodieIOException("Exception scheduling compaction", ioe);
|
|
||||||
}
|
|
||||||
return Option.of(plan);
|
|
||||||
}
|
|
||||||
return Option.empty();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -104,11 +104,6 @@ public class StreamWriteOperatorCoordinator
|
|||||||
*/
|
*/
|
||||||
private final int parallelism;
|
private final int parallelism;
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether needs to schedule compaction task on finished checkpoints.
|
|
||||||
*/
|
|
||||||
private final boolean needsScheduleCompaction;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
|
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
|
||||||
*/
|
*/
|
||||||
@@ -141,7 +136,6 @@ public class StreamWriteOperatorCoordinator
|
|||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.parallelism = context.currentParallelism();
|
this.parallelism = context.currentParallelism();
|
||||||
this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -203,10 +197,6 @@ public class StreamWriteOperatorCoordinator
|
|||||||
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
|
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
|
||||||
final boolean committed = commitInstant();
|
final boolean committed = commitInstant();
|
||||||
if (committed) {
|
if (committed) {
|
||||||
// if async compaction is on, schedule the compaction
|
|
||||||
if (needsScheduleCompaction) {
|
|
||||||
writeClient.scheduleCompaction(Option.empty());
|
|
||||||
}
|
|
||||||
// start new instant.
|
// start new instant.
|
||||||
startInstant();
|
startInstant();
|
||||||
// sync Hive if is enabled
|
// sync Hive if is enabled
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ import org.apache.hudi.util.StreamerUtil;
|
|||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -40,7 +40,7 @@ import java.util.List;
|
|||||||
* Function to execute the actual compaction task assigned by the compaction plan task.
|
* Function to execute the actual compaction task assigned by the compaction plan task.
|
||||||
* In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}.
|
* In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}.
|
||||||
*/
|
*/
|
||||||
public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEvent, CompactionCommitEvent> {
|
public class CompactFunction extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class);
|
private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -53,6 +53,11 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv
|
|||||||
*/
|
*/
|
||||||
private transient HoodieFlinkWriteClient writeClient;
|
private transient HoodieFlinkWriteClient writeClient;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to execute compaction asynchronously.
|
||||||
|
*/
|
||||||
|
private final boolean asyncCompaction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Id of current subtask.
|
* Id of current subtask.
|
||||||
*/
|
*/
|
||||||
@@ -65,23 +70,32 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv
|
|||||||
|
|
||||||
public CompactFunction(Configuration conf) {
|
public CompactFunction(Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||||
|
if (this.asyncCompaction) {
|
||||||
this.executor = new NonThrownExecutor(LOG);
|
this.executor = new NonThrownExecutor(LOG);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
|
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
|
||||||
final String instantTime = event.getCompactionInstantTime();
|
final String instantTime = event.getCompactionInstantTime();
|
||||||
final CompactionOperation compactionOperation = event.getOperation();
|
final CompactionOperation compactionOperation = event.getOperation();
|
||||||
|
if (asyncCompaction) {
|
||||||
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
|
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
|
||||||
executor.execute(
|
executor.execute(
|
||||||
() -> doCompaction(instantTime, compactionOperation, collector), "Execute compaction for instant %s from task %d", instantTime, taskID
|
() -> doCompaction(instantTime, compactionOperation, collector),
|
||||||
);
|
"Execute compaction for instant %s from task %d", instantTime, taskID);
|
||||||
|
} else {
|
||||||
|
// executes the compaction task synchronously for batch mode.
|
||||||
|
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
|
||||||
|
doCompaction(instantTime, compactionOperation, collector);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
|
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
|
||||||
|
|||||||
@@ -122,8 +122,7 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
|
|||||||
|
|
||||||
// Whether to cleanup the old log file when compaction
|
// Whether to cleanup the old log file when compaction
|
||||||
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
||||||
this.writeClient.startAsyncCleaning();
|
this.writeClient.clean();
|
||||||
this.writeClient.waitForCleaningFinish();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the status
|
// reset the status
|
||||||
|
|||||||
@@ -26,8 +26,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.CompactionUtils;
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
|
import org.apache.hudi.util.CompactionUtil;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
|
|||||||
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
|
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
|
||||||
import org.apache.flink.streaming.api.operators.Output;
|
import org.apache.flink.streaming.api.operators.Output;
|
||||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -84,21 +83,34 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void notifyCheckpointComplete(long checkpointId) throws IOException {
|
public void notifyCheckpointComplete(long checkpointId) throws IOException {
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
try {
|
||||||
// the last instant takes the highest priority.
|
scheduleCompaction(checkpointId);
|
||||||
Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
|
} catch (Throwable throwable) {
|
||||||
String compactionInstantTime = compactionInstant.isPresent() ? compactionInstant.get().getTimestamp() : null;
|
// make it fail safe
|
||||||
if (compactionInstantTime == null) {
|
LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scheduleCompaction(long checkpointId) throws IOException {
|
||||||
|
// if async compaction is on, schedule the compaction
|
||||||
|
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
|
||||||
|
final String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
||||||
|
|
||||||
|
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||||
|
if (!scheduled) {
|
||||||
// do nothing.
|
// do nothing.
|
||||||
LOG.info("No compaction plan for checkpoint " + checkpointId);
|
LOG.info("No compaction plan for checkpoint " + checkpointId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.compactionInstantTime != null
|
if (this.compactionInstantTime != null
|
||||||
&& Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
|
&& Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore");
|
LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
||||||
// generate compaction plan
|
// generate compaction plan
|
||||||
// should support configurable commit metadata
|
// should support configurable commit metadata
|
||||||
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
||||||
@@ -121,7 +133,7 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
|
|
||||||
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
||||||
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
|
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
|
||||||
cleanInstant(table.getMetaClient(), instant);
|
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,22 +150,6 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
|
||||||
Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
|
|
||||||
try {
|
|
||||||
if (metaClient.getFs().exists(commitFilePath)) {
|
|
||||||
boolean deleted = metaClient.getFs().delete(commitFilePath, false);
|
|
||||||
if (deleted) {
|
|
||||||
LOG.info("Removed instant " + instant);
|
|
||||||
} else {
|
|
||||||
throw new HoodieIOException("Could not delete instant " + instant);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
|
public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
|
||||||
this.output = output;
|
this.output = output;
|
||||||
|
|||||||
@@ -46,7 +46,8 @@ import static java.util.stream.Collectors.toList;
|
|||||||
* use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
|
* use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
|
||||||
* as the instant time;</li>
|
* as the instant time;</li>
|
||||||
* <li>If the timeline has inflight instants,
|
* <li>If the timeline has inflight instants,
|
||||||
* use the {earliest inflight instant time - 1ms} as the instant time.</li>
|
* use the median instant time between [last complete instant time, earliest inflight instant time]
|
||||||
|
* as the instant time.</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
public class CompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
|
public class CompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
|
||||||
|
|||||||
@@ -101,6 +101,8 @@ public class FlinkCompactionConfig extends Configuration {
|
|||||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
|
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
|
||||||
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
|
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
|
||||||
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
|
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
|
||||||
|
// use synchronous compaction always
|
||||||
|
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||||
|
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.common.util.CompactionUtils;
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.apache.hudi.util.CompactionUtil;
|
import org.apache.hudi.util.CompactionUtil;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
@@ -36,12 +35,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flink hudi compaction program that can be executed manually.
|
* Flink hudi compaction program that can be executed manually.
|
||||||
*/
|
*/
|
||||||
@@ -72,19 +68,15 @@ public class HoodieFlinkCompactor {
|
|||||||
|
|
||||||
// judge whether have operation
|
// judge whether have operation
|
||||||
// to compute the compaction instant time and do compaction.
|
// to compute the compaction instant time and do compaction.
|
||||||
String instantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
||||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||||
writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
|
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||||
|
if (!scheduled) {
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
|
||||||
// the last instant takes the highest priority.
|
|
||||||
Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
|
|
||||||
String compactionInstantTime = compactionInstant.isPresent() ? compactionInstant.get().getTimestamp() : null;
|
|
||||||
if (compactionInstantTime == null) {
|
|
||||||
// do nothing.
|
// do nothing.
|
||||||
LOG.info("No compaction plan for this job ");
|
LOG.info("No compaction plan for this job ");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
||||||
// generate compaction plan
|
// generate compaction plan
|
||||||
// should support configurable commit metadata
|
// should support configurable commit metadata
|
||||||
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
||||||
@@ -108,7 +100,7 @@ public class HoodieFlinkCompactor {
|
|||||||
|
|
||||||
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
|
||||||
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
|
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
|
||||||
cleanInstant(table.getMetaClient(), instant);
|
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -118,7 +110,7 @@ public class HoodieFlinkCompactor {
|
|||||||
.rebalance()
|
.rebalance()
|
||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
new ProcessOperator<>(new NonKeyedCompactFunction(conf)))
|
new ProcessOperator<>(new CompactFunction(conf)))
|
||||||
.setParallelism(compactionPlan.getOperations().size())
|
.setParallelism(compactionPlan.getOperations().size())
|
||||||
.addSink(new CompactionCommitSink(conf))
|
.addSink(new CompactionCommitSink(conf))
|
||||||
.name("clean_commits")
|
.name("clean_commits")
|
||||||
@@ -127,20 +119,4 @@ public class HoodieFlinkCompactor {
|
|||||||
|
|
||||||
env.execute("flink_hudi_compaction");
|
env.execute("flink_hudi_compaction");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
|
||||||
Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
|
|
||||||
try {
|
|
||||||
if (metaClient.getFs().exists(commitFilePath)) {
|
|
||||||
boolean deleted = metaClient.getFs().delete(commitFilePath, false);
|
|
||||||
if (deleted) {
|
|
||||||
LOG.info("Removed instant " + instant);
|
|
||||||
} else {
|
|
||||||
throw new HoodieIOException("Could not delete instant " + instant);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,102 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hudi.sink.compact;
|
|
||||||
|
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
|
||||||
import org.apache.hudi.client.WriteStatus;
|
|
||||||
import org.apache.hudi.common.model.CompactionOperation;
|
|
||||||
import org.apache.hudi.sink.utils.NonThrownExecutor;
|
|
||||||
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
|
|
||||||
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
|
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
|
||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
|
||||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
|
||||||
import org.apache.flink.util.Collector;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Function to execute the actual compaction task assigned by the compaction plan task.
|
|
||||||
* The input compact event {@link CompactionPlanEvent}s were distributed evenly to this function.
|
|
||||||
*/
|
|
||||||
public class NonKeyedCompactFunction extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(NonKeyedCompactFunction.class);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Config options.
|
|
||||||
*/
|
|
||||||
private final Configuration conf;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write Client.
|
|
||||||
*/
|
|
||||||
private transient HoodieFlinkWriteClient writeClient;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Id of current subtask.
|
|
||||||
*/
|
|
||||||
private int taskID;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executor service to execute the compaction task.
|
|
||||||
*/
|
|
||||||
private transient NonThrownExecutor executor;
|
|
||||||
|
|
||||||
public NonKeyedCompactFunction(Configuration conf) {
|
|
||||||
this.conf = conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void open(Configuration parameters) throws Exception {
|
|
||||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
|
||||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
|
||||||
this.executor = new NonThrownExecutor(LOG);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void processElement(CompactionPlanEvent event, Context ctx, Collector<CompactionCommitEvent> collector) throws Exception {
|
|
||||||
final String instantTime = event.getCompactionInstantTime();
|
|
||||||
final CompactionOperation compactionOperation = event.getOperation();
|
|
||||||
doCompaction(instantTime, compactionOperation, collector);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
|
|
||||||
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
|
|
||||||
List<WriteStatus> writeStatuses = compactor.compact(
|
|
||||||
new HoodieFlinkCopyOnWriteTable<>(
|
|
||||||
this.writeClient.getConfig(),
|
|
||||||
this.writeClient.getEngineContext(),
|
|
||||||
this.writeClient.getHoodieTable().getMetaClient()),
|
|
||||||
this.writeClient.getHoodieTable().getMetaClient(),
|
|
||||||
this.writeClient.getConfig(),
|
|
||||||
compactionOperation,
|
|
||||||
instantTime);
|
|
||||||
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void setExecutor(NonThrownExecutor executor) {
|
|
||||||
this.executor = executor;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -117,16 +117,16 @@ public class HoodieFlinkStreamer {
|
|||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_stream_write")
|
.uid("uid_hoodie_stream_write")
|
||||||
.setParallelism(numWriteTask);
|
.setParallelism(numWriteTask);
|
||||||
if (StreamerUtil.needsScheduleCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
pipeline.transform("compact_plan_generate",
|
pipeline.transform("compact_plan_generate",
|
||||||
TypeInformation.of(CompactionPlanEvent.class),
|
TypeInformation.of(CompactionPlanEvent.class),
|
||||||
new CompactionPlanOperator(conf))
|
new CompactionPlanOperator(conf))
|
||||||
.uid("uid_compact_plan_generate")
|
.uid("uid_compact_plan_generate")
|
||||||
.setParallelism(1) // plan generate must be singleton
|
.setParallelism(1) // plan generate must be singleton
|
||||||
.keyBy(event -> event.getOperation().hashCode())
|
.rebalance()
|
||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
new KeyedProcessOperator<>(new CompactFunction(conf)))
|
new ProcessOperator<>(new CompactFunction(conf)))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
||||||
.addSink(new CompactionCommitSink(conf))
|
.addSink(new CompactionCommitSink(conf))
|
||||||
.name("compact_commit")
|
.name("compact_commit")
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ import org.apache.flink.annotation.VisibleForTesting;
|
|||||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
|
||||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||||
import org.apache.flink.table.api.TableSchema;
|
import org.apache.flink.table.api.TableSchema;
|
||||||
import org.apache.flink.table.connector.ChangelogMode;
|
import org.apache.flink.table.connector.ChangelogMode;
|
||||||
@@ -99,16 +98,16 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.name("uid_hoodie_stream_write")
|
.name("uid_hoodie_stream_write")
|
||||||
.setParallelism(numWriteTasks);
|
.setParallelism(numWriteTasks);
|
||||||
if (StreamerUtil.needsScheduleCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
return pipeline.transform("compact_plan_generate",
|
return pipeline.transform("compact_plan_generate",
|
||||||
TypeInformation.of(CompactionPlanEvent.class),
|
TypeInformation.of(CompactionPlanEvent.class),
|
||||||
new CompactionPlanOperator(conf))
|
new CompactionPlanOperator(conf))
|
||||||
.name("uid_compact_plan_generate")
|
.name("uid_compact_plan_generate")
|
||||||
.setParallelism(1) // plan generate must be singleton
|
.setParallelism(1) // plan generate must be singleton
|
||||||
.keyBy(event -> event.getOperation().hashCode())
|
.rebalance()
|
||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
new KeyedProcessOperator<>(new CompactFunction(conf)))
|
new ProcessOperator<>(new CompactFunction(conf)))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
||||||
.addSink(new CompactionCommitSink(conf))
|
.addSink(new CompactionCommitSink(conf))
|
||||||
.name("compact_commit")
|
.name("compact_commit")
|
||||||
|
|||||||
@@ -22,18 +22,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.List;
|
import java.io.IOException;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities for flink hudi compaction.
|
* Utilities for flink hudi compaction.
|
||||||
@@ -44,29 +43,24 @@ public class CompactionUtil {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates the metaClient.
|
* Creates the metaClient.
|
||||||
* */
|
*/
|
||||||
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
|
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
|
||||||
return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
|
return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets compaction Instant time.
|
* Gets compaction Instant time.
|
||||||
* */
|
*/
|
||||||
public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) {
|
public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) {
|
||||||
Option<HoodieInstant> hoodieInstantOption = metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant();
|
Option<HoodieInstant> firstPendingInstant = metaClient.getCommitsTimeline()
|
||||||
if (hoodieInstantOption.isPresent()) {
|
.filterPendingExcludingCompaction().firstInstant();
|
||||||
HoodieInstant firstInstant = hoodieInstantOption.get();
|
Option<HoodieInstant> lastCompleteInstant = metaClient.getActiveTimeline().getWriteTimeline()
|
||||||
String newCommitTime = StreamerUtil.instantTimeSubtract(firstInstant.getTimestamp(), 1);
|
.filterCompletedAndCompactionInstants().lastInstant();
|
||||||
|
if (firstPendingInstant.isPresent() && lastCompleteInstant.isPresent()) {
|
||||||
|
String firstPendingTimestamp = firstPendingInstant.get().getTimestamp();
|
||||||
|
String lastCompleteTimestamp = lastCompleteInstant.get().getTimestamp();
|
||||||
// Committed and pending compaction instants should have strictly lower timestamps
|
// Committed and pending compaction instants should have strictly lower timestamps
|
||||||
List<HoodieInstant> conflictingInstants = metaClient.getActiveTimeline()
|
return StreamerUtil.medianInstantTime(firstPendingTimestamp, lastCompleteTimestamp);
|
||||||
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
|
|
||||||
.filter(instant -> HoodieTimeline.compareTimestamps(
|
|
||||||
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, newCommitTime))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
|
|
||||||
"Following instants have timestamps >= compactionInstant (" + newCommitTime + ") Instants :"
|
|
||||||
+ conflictingInstants);
|
|
||||||
return newCommitTime;
|
|
||||||
} else {
|
} else {
|
||||||
return HoodieActiveTimeline.createNewInstantTime();
|
return HoodieActiveTimeline.createNewInstantTime();
|
||||||
}
|
}
|
||||||
@@ -83,4 +77,23 @@ public class CompactionUtil {
|
|||||||
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
|
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
|
||||||
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString());
|
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleans the metadata file for given instant {@code instant}.
|
||||||
|
*/
|
||||||
|
public static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
||||||
|
Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
|
||||||
|
try {
|
||||||
|
if (metaClient.getFs().exists(commitFilePath)) {
|
||||||
|
boolean deleted = metaClient.getFs().delete(commitFilePath, false);
|
||||||
|
if (deleted) {
|
||||||
|
LOG.info("Removed instant " + instant);
|
||||||
|
} else {
|
||||||
|
throw new HoodieIOException("Could not delete instant " + instant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -276,7 +276,7 @@ public class StreamerUtil {
|
|||||||
* Returns whether needs to schedule the async compaction.
|
* Returns whether needs to schedule the async compaction.
|
||||||
* @param conf The flink configuration.
|
* @param conf The flink configuration.
|
||||||
*/
|
*/
|
||||||
public static boolean needsScheduleCompaction(Configuration conf) {
|
public static boolean needsAsyncCompaction(Configuration conf) {
|
||||||
return conf.getString(FlinkOptions.TABLE_TYPE)
|
return conf.getString(FlinkOptions.TABLE_TYPE)
|
||||||
.toUpperCase(Locale.ROOT)
|
.toUpperCase(Locale.ROOT)
|
||||||
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
|
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
|
||||||
@@ -304,10 +304,12 @@ public class StreamerUtil {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subtract the old instant time with given milliseconds and returns.
|
* Return the median instant time between the given two instant time.
|
||||||
* */
|
*/
|
||||||
public static String instantTimeSubtract(String oldInstant, long milliseconds) {
|
public static String medianInstantTime(String highVal, String lowVal) {
|
||||||
long oldTime = Long.parseLong(oldInstant);
|
long high = Long.parseLong(highVal);
|
||||||
return String.valueOf(oldTime - milliseconds);
|
long low = Long.parseLong(lowVal);
|
||||||
|
long median = low + (high - low) / 2;
|
||||||
|
return String.valueOf(median);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,7 +36,6 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
|||||||
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
|
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
|
||||||
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
|
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
|
||||||
import org.apache.hudi.sink.compact.NonKeyedCompactFunction;
|
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
||||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
|
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
|
||||||
@@ -62,11 +61,9 @@ import org.apache.flink.streaming.api.CheckpointingMode;
|
|||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
|
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
|
||||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
|
||||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||||
import org.apache.flink.table.api.EnvironmentSettings;
|
import org.apache.flink.table.api.EnvironmentSettings;
|
||||||
import org.apache.flink.table.api.TableEnvironment;
|
import org.apache.flink.table.api.TableEnvironment;
|
||||||
import org.apache.flink.table.api.TableResult;
|
|
||||||
import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
||||||
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
||||||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
||||||
@@ -176,6 +173,7 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
tableEnv.getConfig().getConfiguration()
|
tableEnv.getConfig().getConfiguration()
|
||||||
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
||||||
Map<String, String> options = new HashMap<>();
|
Map<String, String> options = new HashMap<>();
|
||||||
|
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
|
||||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||||
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
||||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||||
@@ -189,9 +187,10 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
|
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
|
||||||
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
|
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
|
||||||
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
|
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
|
||||||
TableResult tableResult = tableEnv.executeSql(insertInto);
|
tableEnv.executeSql(insertInto).await();
|
||||||
TimeUnit.SECONDS.sleep(5);
|
|
||||||
tableResult.await();
|
// wait for the asynchronous commit to finish
|
||||||
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
|
||||||
// Make configuration and setAvroSchema.
|
// Make configuration and setAvroSchema.
|
||||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
@@ -199,7 +198,6 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
cfg.path = tempFile.getAbsolutePath();
|
cfg.path = tempFile.getAbsolutePath();
|
||||||
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
|
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
|
||||||
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
||||||
conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition");
|
|
||||||
|
|
||||||
// create metaClient
|
// create metaClient
|
||||||
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
|
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
|
||||||
@@ -212,15 +210,11 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
|
|
||||||
// judge whether have operation
|
// judge whether have operation
|
||||||
// To compute the compaction instant time and do compaction.
|
// To compute the compaction instant time and do compaction.
|
||||||
String instantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
||||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||||
writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
|
writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||||
|
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
||||||
// the last instant takes the highest priority.
|
|
||||||
Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
|
|
||||||
String compactionInstantTime = compactionInstant.get().getTimestamp();
|
|
||||||
|
|
||||||
// generate compaction plan
|
// generate compaction plan
|
||||||
// should support configurable commit metadata
|
// should support configurable commit metadata
|
||||||
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
||||||
@@ -234,7 +228,7 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
.rebalance()
|
.rebalance()
|
||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
new ProcessOperator<>(new NonKeyedCompactFunction(conf)))
|
new ProcessOperator<>(new CompactFunction(conf)))
|
||||||
.setParallelism(compactionPlan.getOperations().size())
|
.setParallelism(compactionPlan.getOperations().size())
|
||||||
.addSink(new CompactionCommitSink(conf))
|
.addSink(new CompactionCommitSink(conf))
|
||||||
.name("clean_commits")
|
.name("clean_commits")
|
||||||
@@ -314,10 +308,10 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
new CompactionPlanOperator(conf))
|
new CompactionPlanOperator(conf))
|
||||||
.uid("uid_compact_plan_generate")
|
.uid("uid_compact_plan_generate")
|
||||||
.setParallelism(1) // plan generate must be singleton
|
.setParallelism(1) // plan generate must be singleton
|
||||||
.keyBy(event -> event.getOperation().hashCode())
|
.rebalance()
|
||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
new KeyedProcessOperator<>(new CompactFunction(conf)))
|
new ProcessOperator<>(new CompactFunction(conf)))
|
||||||
.addSink(new CompactionCommitSink(conf))
|
.addSink(new CompactionCommitSink(conf))
|
||||||
.name("compact_commit")
|
.name("compact_commit")
|
||||||
.setParallelism(1);
|
.setParallelism(1);
|
||||||
|
|||||||
@@ -28,11 +28,13 @@ import org.apache.hudi.common.table.TableSchemaResolver;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestData;
|
import org.apache.hudi.utils.TestData;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Disabled;
|
import org.junit.jupiter.api.Disabled;
|
||||||
@@ -62,6 +64,11 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
|
|||||||
new FlinkTaskContextSupplier(null));
|
new FlinkTaskContextSupplier(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUp(Configuration conf) {
|
||||||
|
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
|
protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
|
||||||
HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
|
HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
|
|||||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
||||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
|
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestConfigurations;
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
@@ -53,6 +54,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing.
|
* A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing.
|
||||||
@@ -82,6 +84,8 @@ public class StreamWriteFunctionWrapper<I> {
|
|||||||
|
|
||||||
private CompactFunctionWrapper compactFunctionWrapper;
|
private CompactFunctionWrapper compactFunctionWrapper;
|
||||||
|
|
||||||
|
private final boolean asyncCompaction;
|
||||||
|
|
||||||
public StreamWriteFunctionWrapper(String tablePath) throws Exception {
|
public StreamWriteFunctionWrapper(String tablePath) throws Exception {
|
||||||
this(tablePath, TestConfigurations.getDefaultConf(tablePath));
|
this(tablePath, TestConfigurations.getDefaultConf(tablePath));
|
||||||
}
|
}
|
||||||
@@ -103,6 +107,7 @@ public class StreamWriteFunctionWrapper<I> {
|
|||||||
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
|
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
|
||||||
this.functionInitializationContext = new MockFunctionInitializationContext();
|
this.functionInitializationContext = new MockFunctionInitializationContext();
|
||||||
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
||||||
|
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
|
||||||
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
|
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -131,7 +136,7 @@ public class StreamWriteFunctionWrapper<I> {
|
|||||||
writeFunction.setOperatorEventGateway(gateway);
|
writeFunction.setOperatorEventGateway(gateway);
|
||||||
writeFunction.open(conf);
|
writeFunction.open(conf);
|
||||||
|
|
||||||
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
|
if (asyncCompaction) {
|
||||||
compactFunctionWrapper.openFunction();
|
compactFunctionWrapper.openFunction();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -208,10 +213,19 @@ public class StreamWriteFunctionWrapper<I> {
|
|||||||
|
|
||||||
public void checkpointComplete(long checkpointId) {
|
public void checkpointComplete(long checkpointId) {
|
||||||
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
||||||
|
if (asyncCompaction) {
|
||||||
|
// sleep for a while to give a change for scheduling compaction,
|
||||||
|
// see HoodieActiveTimeline#createNewInstantTime for details.
|
||||||
|
try {
|
||||||
|
TimeUnit.SECONDS.sleep(2);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new HoodieException("Waiting for checkpoint success exception", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
coordinator.notifyCheckpointComplete(checkpointId);
|
coordinator.notifyCheckpointComplete(checkpointId);
|
||||||
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
||||||
this.writeFunction.notifyCheckpointComplete(checkpointId);
|
this.writeFunction.notifyCheckpointComplete(checkpointId);
|
||||||
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
|
if (asyncCompaction) {
|
||||||
try {
|
try {
|
||||||
compactFunctionWrapper.compact(checkpointId);
|
compactFunctionWrapper.compact(checkpointId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
Reference in New Issue
Block a user