1
0

[HUDI-4152] Flink offline compaction support compacting multi compaction plan at once (#5677)

* [HUDI-4152] Flink offline compaction allow compact multi compaction plan at once

* [HUDI-4152] Fix exception for duplicated uid when multi compaction plan are compacted

* [HUDI-4152] Provider UT & IT for compact multi compaction plan

* [HUDI-4152] Put multi compaction plans into one compaction plan source

* [HUDI-4152] InstantCompactionPlanSelectStrategy allow multi instant by using comma

* [HUDI-4152] Add IT for InstantCompactionPlanSelectStrategy
This commit is contained in:
2022-07-07 14:11:26 +08:00
committed by GitHub
parent 7eeaff9ee0
commit e74ad324c3
10 changed files with 655 additions and 39 deletions

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
/** /**
* Configurations for Hoodie Flink compaction. * Configurations for Hoodie Flink compaction.
@@ -110,6 +111,21 @@ public class FlinkCompactionConfig extends Configuration {
description = "Min compaction interval of async compaction service, default 10 minutes") description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600; public Integer minCompactionIntervalSeconds = 600;
@Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+ "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan."
+ "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)."
+ "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan"
+ "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant")
public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName();
@Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction."
+ "It's only effective for MultiCompactionPlanSelectStrategy.")
public Integer compactionPlanMaxSelect = 10;
@Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than"
+ "one instant in a time by using comma."
+ "It's only effective for InstantCompactionPlanSelectStrategy.")
public String compactionPlanInstant;
@Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false) @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue(); public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();

View File

@@ -18,6 +18,11 @@
package org.apache.hudi.sink.compact; package org.apache.hudi.sink.compact;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
@@ -26,9 +31,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
@@ -42,9 +49,12 @@ import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/** /**
* Flink hudi compaction program that can be executed manually. * Flink hudi compaction program that can be executed manually.
@@ -218,74 +228,104 @@ public class HoodieFlinkCompactor {
} }
// fetch the instant based on the configured execution sequence // fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieTimeline timeline = table.getActiveTimeline();
Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant(); List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
if (!requested.isPresent()) { .select(timeline.filterPendingCompactionTimeline(), cfg);
if (requested.isEmpty()) {
// do nothing. // do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
return; return;
} }
String compactionInstantTime = requested.get().getTimestamp(); List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
compactionInstantTimes.forEach(timestamp -> {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
if (timeline.containsInstant(inflightInstant)) { if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]"); LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
table.rollbackInflightCompaction(inflightInstant); table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
} }
});
// generate compaction plan // generate timestamp and compaction plan pair
// should support configurable commit metadata // should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( List<Pair<String, HoodieCompactionPlan>> compactionPlans = compactionInstantTimes.stream()
table.getMetaClient(), compactionInstantTime); .map(timestamp -> {
try {
return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
} catch (IOException e) {
throw new HoodieException(e);
}
})
// reject empty compaction plan
.filter(pair -> !(pair.getRight() == null
|| pair.getRight().getOperations() == null
|| pair.getRight().getOperations().isEmpty()))
.collect(Collectors.toList());
if (compactionPlan == null || (compactionPlan.getOperations() == null) if (compactionPlans.isEmpty()) {
|| (compactionPlan.getOperations().isEmpty())) {
// No compaction plan, do nothing and return. // No compaction plan, do nothing and return.
LOG.info("No compaction plan for instant " + compactionInstantTime); LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes));
return; return;
} }
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
for (HoodieInstant instant : instants) {
if (!pendingCompactionTimeline.containsInstant(instant)) { if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp) // this means that the compaction plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush // but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally. // exceptionally.
// clean the compaction plan in auxiliary path and cancels the compaction. // clean the compaction plan in auxiliary path and cancels the compaction.
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the compaction plan in auxiliary path and cancels the compaction"); + "Clean the compaction plan in auxiliary path and cancels the compaction");
CompactionUtil.cleanInstant(table.getMetaClient(), instant); CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return; return;
} }
}
// get compactionParallelism. // get compactionParallelism.
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum())
: conf.getInteger(FlinkOptions.COMPACTION_TASKS);
LOG.info("Start to compaction for instant " + compactionInstantTime); LOG.info("Start to compaction for instant " + compactionInstantTimes);
// Mark instant as compaction inflight // Mark instant as compaction inflight
for (HoodieInstant instant : instants) {
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
}
table.getMetaClient().reloadActiveTimeline(); table.getMetaClient().reloadActiveTimeline();
env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) // use side-output to make operations that is in the same plan to be placed in the same stream
// keyby() cannot sure that different operations are in the different stream
DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
.name("compaction_source") .name("compaction_source")
.uid("uid_compaction_source") .uid("uid_compaction_source");
.rebalance()
SingleOutputStreamOperator<Void> operator = source.rebalance()
.transform("compact_task", .transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class), TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf))) new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionParallelism) .setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf)) .process(new ProcessFunction<CompactionCommitEvent, Void>() {
.name("clean_commits") @Override
.uid("uid_clean_commits") public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
}
})
.name("group_by_compaction_plan")
.uid("uid_group_by_compaction_plan")
.setParallelism(1); .setParallelism(1);
env.execute("flink_hudi_compaction_" + compactionInstantTime); compactionPlans.forEach(pair ->
operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
.addSink(new CompactionCommitSink(conf))
.name("clean_commits " + pair.getLeft())
.uid("uid_clean_commits_" + pair.getLeft())
.setParallelism(1));
env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
} }
/** /**

View File

@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact;
import static java.util.stream.Collectors.toList;
import java.util.List;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.util.collection.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flink hudi compaction source function.
*
* <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
* event {@link CompactionPlanEvent} to downstream operators.
*
* <p>The compaction instant time is specified explicitly with strategies:
*
* <ul>
* <li>If the timeline has no inflight instants,
* use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
* as the instant time;</li>
* <li>If the timeline has inflight instants,
* use the median instant time between [last complete instant time, earliest inflight instant time]
* as the instant time.</li>
* </ul>
*/
public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {
protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class);
/**
* compaction plan instant -> compaction plan
*/
private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;
public MultiCompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
this.compactionPlans = compactionPlans;
}
@Override
public void open(Configuration parameters) throws Exception {
// no operation
}
@Override
public void run(SourceContext sourceContext) throws Exception {
for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
HoodieCompactionPlan compactionPlan = pair.getRight();
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
}
}
}
@Override
public void close() throws Exception {
// no operation
}
@Override
public void cancel() {
// no operation
}
}

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact.strategy;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
/**
* Select all pending compaction plan to compact
*/
public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
@Override
public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
return pendingCompactionTimeline.getInstants().collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact.strategy;
import java.util.List;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
/**
* CompactionRangeStrategy
*/
public interface CompactionPlanSelectStrategy {
/**
* Define how to select compaction plan to compact
*/
List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config);
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact.strategy;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Specify the compaction plan instant to compact
*/
public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
@Override
public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) {
LOG.warn("None instant is selected");
return Collections.emptyList();
}
List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
return pendingCompactionTimeline.getInstants()
.filter(instant -> instants.contains(instant.getTimestamp()))
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact.strategy;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.util.CompactionUtil;
/**
* Select multi compaction plan to compact
*/
public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
@Override
public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList());
if (CompactionUtil.isLIFO(config.compactionSeq)) {
Collections.reverse(pendingCompactionPlanInstants);
}
int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size());
return pendingCompactionPlanInstants.subList(0, range);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact.strategy;
import java.util.Collections;
import java.util.List;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.util.CompactionUtil;
/**
* Select one compaction plan to compact
*/
public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
@Override
public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
Option<HoodieInstant> compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq)
? pendingCompactionTimeline.lastInstant()
: pendingCompactionTimeline.firstInstant();
if (compactionPlanInstant.isPresent()) {
return Collections.singletonList(compactionPlanInstant.get());
}
return Collections.emptyList();
}
}

View File

@@ -18,6 +18,11 @@
package org.apache.hudi.sink.compact; package org.apache.hudi.sink.compact;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -25,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.CompactionUtil;
@@ -49,6 +55,7 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -67,6 +74,8 @@ public class ITTestHoodieFlinkCompactor {
private static final Map<String, List<String>> EXPECTED2 = new HashMap<>(); private static final Map<String, List<String>> EXPECTED2 = new HashMap<>();
private static final Map<String, List<String>> EXPECTED3 = new HashMap<>();
static { static {
EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
@@ -77,6 +86,12 @@ public class ITTestHoodieFlinkCompactor {
EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2")); EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3")); EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3"));
EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4")); EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4"));
EXPECTED3.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED3.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
EXPECTED3.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
EXPECTED3.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
EXPECTED3.put("par5", Arrays.asList("id12,par5,id12,Tony,27,9000,par5", "id13,par5,id13,Jenny,72,10000,par5"));
} }
@TempDir @TempDir
@@ -203,4 +218,106 @@ public class ITTestHoodieFlinkCompactor {
TestData.checkWrittenFullData(tempFile, EXPECTED2); TestData.checkWrittenFullData(tempFile, EXPECTED2);
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception {
// Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
tableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();
TimeUnit.SECONDS.sleep(3);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = tempFile.getAbsolutePath();
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
CompactionUtil.setAvroSchema(conf, metaClient);
CompactionUtil.inferChangelogMode(conf, metaClient);
List<String> compactionInstantTimeList = new ArrayList<>(2);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
// insert a new record to new partition, so that we can generate a new compaction plan
String insertT1ForNewPartition = "insert into t1 values\n"
+ "('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n"
+ "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')";
tableEnv.executeSql(insertT1ForNewPartition).await();
TimeUnit.SECONDS.sleep(3);
compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient));
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
List<Pair<String, HoodieCompactionPlan>> compactionPlans = new ArrayList<>(2);
for (String compactionInstantTime : compactionInstantTimeList) {
HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(table.getMetaClient(), compactionInstantTime);
compactionPlans.add(Pair.of(compactionInstantTime, plan));
}
// Mark instant as compaction inflight
for (String compactionInstantTime : compactionInstantTimeList) {
HoodieInstant hoodieInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
table.getActiveTimeline().transitionCompactionRequestedToInflight(hoodieInstant);
}
table.getMetaClient().reloadActiveTimeline();
DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
.uid("uid_compaction_source");
SingleOutputStreamOperator<Void> operator = source.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(1)
.process(new ProcessFunction<CompactionCommitEvent, Void>() {
@Override
public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
}
})
.name("group_by_compaction_plan")
.uid("uid_group_by_compaction_plan")
.setParallelism(1);
compactionPlans.forEach(pair ->
operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
.addSink(new CompactionCommitSink(conf))
.name("clean_commits " + pair.getLeft())
.uid("uid_clean_commits_" + pair.getLeft())
.setParallelism(1));
env.execute("flink_hudi_compaction");
writeClient.close();
TestData.checkWrittenFullData(tempFile, EXPECTED3);
}
private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
boolean scheduled = false;
// judge whether have operation
// To compute the compaction instant time and do compaction.
Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTimeOption.isPresent()) {
scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
}
assertTrue(scheduled, "The compaction plan should be scheduled");
return compactionInstantTimeOption.get();
}
} }

View File

@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.compact;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Arrays;
import java.util.List;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy;
import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy;
import org.apache.hudi.sink.compact.strategy.MultiCompactionPlanSelectStrategy;
import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Test case for every {@link CompactionPlanSelectStrategy} implements
*/
public class TestCompactionPlanSelectStrategy {
private HoodieTimeline timeline;
private HoodieTimeline emptyTimeline;
private HoodieTimeline allCompleteTimeline;
private static final HoodieInstant INSTANT_001 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001");
private static final HoodieInstant INSTANT_002 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "002");
private static final HoodieInstant INSTANT_003 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "003");
private static final HoodieInstant INSTANT_004 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "004");
private static final HoodieInstant INSTANT_005 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, "005");
private static final HoodieInstant INSTANT_006 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "006");
@BeforeEach
public void beforeEach() {
timeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_005, INSTANT_006);
emptyTimeline = new MockHoodieActiveTimeline();
allCompleteTimeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_005);
}
@Test
void testSingleCompactionPlanSelectStrategy() {
HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
SingleCompactionPlanSelectStrategy strategy = new SingleCompactionPlanSelectStrategy();
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline, compactionConfig));
compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
}
@Test
void testMultiCompactionPlanSelectStrategy() {
HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
compactionConfig.compactionPlanMaxSelect = 2;
MultiCompactionPlanSelectStrategy strategy = new MultiCompactionPlanSelectStrategy();
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO;
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
}
@Test
void testAllPendingCompactionPlanSelectStrategy() {
HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
AllPendingCompactionPlanSelectStrategy strategy = new AllPendingCompactionPlanSelectStrategy();
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_006},
strategy.select(pendingCompactionTimeline, compactionConfig));
HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline();
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig));
HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline();
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig));
}
@Test
void testInstantCompactionPlanSelectStrategy() {
HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline();
FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig();
compactionConfig.compactionPlanInstant = "004";
InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy();
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
compactionConfig.compactionPlanInstant = "002,003";
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
compactionConfig.compactionPlanInstant = "002,005";
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
compactionConfig.compactionPlanInstant = "005";
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig));
}
private void assertHoodieInstantsEquals(HoodieInstant[] expected, List<HoodieInstant> actual) {
assertEquals(expected.length, actual.size());
for (int index = 0; index < expected.length; index++) {
assertHoodieInstantEquals(expected[index], actual.get(index));
}
}
private void assertHoodieInstantEquals(HoodieInstant expected, HoodieInstant actual) {
assertEquals(expected.getState(), actual.getState());
assertEquals(expected.getAction(), actual.getAction());
assertEquals(expected.getTimestamp(), actual.getTimestamp());
}
private static final class MockHoodieActiveTimeline extends HoodieActiveTimeline {
public MockHoodieActiveTimeline(HoodieInstant... instants) {
super();
setInstants(Arrays.asList(instants));
}
}
}