[HUDI-2483] Infer changelog mode for flink compactor (#3706)
This commit is contained in:
@@ -149,7 +149,7 @@ public class TableSchemaResolver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Schema getTableAvroSchemaFromDataFile() throws Exception {
|
public Schema getTableAvroSchemaFromDataFile() throws Exception {
|
||||||
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
|
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -66,6 +66,9 @@ public class HoodieFlinkCompactor {
|
|||||||
// set table schema
|
// set table schema
|
||||||
CompactionUtil.setAvroSchema(conf, metaClient);
|
CompactionUtil.setAvroSchema(conf, metaClient);
|
||||||
|
|
||||||
|
// infer changelog mode
|
||||||
|
CompactionUtil.inferChangelogMode(conf, metaClient);
|
||||||
|
|
||||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
||||||
|
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import scala.Serializable;
|
||||||
|
|
||||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
|
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
|
||||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
|
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
|
||||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
|
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
|
||||||
@@ -66,7 +68,10 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_O
|
|||||||
* <li>use the file paths from #step 3 as the back-up of the filesystem view.</li>
|
* <li>use the file paths from #step 3 as the back-up of the filesystem view.</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
*/
|
*/
|
||||||
public class IncrementalInputSplits {
|
public class IncrementalInputSplits implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
|
private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final Path path;
|
private final Path path;
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.util;
|
package org.apache.hudi.util;
|
||||||
|
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
@@ -76,6 +77,21 @@ public class CompactionUtil {
|
|||||||
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
|
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Infers the changelog mode based on the data file schema(including metadata fields).
|
||||||
|
*
|
||||||
|
* <p>We can improve the code if the changelog mode is set up as table config.
|
||||||
|
*
|
||||||
|
* @param conf The configuration
|
||||||
|
*/
|
||||||
|
public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
|
||||||
|
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
|
||||||
|
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
|
||||||
|
if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) {
|
||||||
|
conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleans the metadata file for given instant {@code instant}.
|
* Cleans the metadata file for given instant {@code instant}.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -18,27 +18,13 @@
|
|||||||
|
|
||||||
package org.apache.hudi.sink;
|
package org.apache.hudi.sink;
|
||||||
|
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|
||||||
import org.apache.hudi.common.util.CompactionUtils;
|
|
||||||
import org.apache.hudi.common.util.Option;
|
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.sink.compact.CompactFunction;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionCommitEvent;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionCommitSink;
|
|
||||||
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
|
|
||||||
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
|
|
||||||
import org.apache.hudi.sink.transform.ChainedTransformer;
|
import org.apache.hudi.sink.transform.ChainedTransformer;
|
||||||
import org.apache.hudi.sink.transform.Transformer;
|
import org.apache.hudi.sink.transform.Transformer;
|
||||||
import org.apache.hudi.sink.utils.Pipelines;
|
import org.apache.hudi.sink.utils.Pipelines;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
|
||||||
import org.apache.hudi.util.AvroSchemaConverter;
|
import org.apache.hudi.util.AvroSchemaConverter;
|
||||||
import org.apache.hudi.util.CompactionUtil;
|
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestConfigurations;
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
import org.apache.hudi.utils.TestData;
|
import org.apache.hudi.utils.TestData;
|
||||||
@@ -58,11 +44,6 @@ import org.apache.flink.streaming.api.CheckpointingMode;
|
|||||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
|
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
|
||||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
|
||||||
import org.apache.flink.table.api.EnvironmentSettings;
|
|
||||||
import org.apache.flink.table.api.TableEnvironment;
|
|
||||||
import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
|
||||||
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
|
||||||
import org.apache.flink.table.data.GenericRowData;
|
import org.apache.flink.table.data.GenericRowData;
|
||||||
import org.apache.flink.table.data.RowData;
|
import org.apache.flink.table.data.RowData;
|
||||||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
||||||
@@ -80,8 +61,6 @@ import java.util.Map;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Integration test for Flink Hoodie stream sink.
|
* Integration test for Flink Hoodie stream sink.
|
||||||
*/
|
*/
|
||||||
@@ -150,84 +129,6 @@ public class StreamWriteITCase extends TestLogger {
|
|||||||
testWriteToHoodie(null, EXPECTED);
|
testWriteToHoodie(null, EXPECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testHoodieFlinkCompactor() throws Exception {
|
|
||||||
// Create hoodie table and insert into data.
|
|
||||||
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
|
|
||||||
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
|
|
||||||
tableEnv.getConfig().getConfiguration()
|
|
||||||
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
|
||||||
Map<String, String> options = new HashMap<>();
|
|
||||||
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
|
|
||||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
|
||||||
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
|
||||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
|
||||||
tableEnv.executeSql(hoodieTableDDL);
|
|
||||||
String insertInto = "insert into t1 values\n"
|
|
||||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
|
|
||||||
+ "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
|
|
||||||
+ "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
|
|
||||||
+ "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
|
|
||||||
+ "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
|
|
||||||
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
|
|
||||||
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
|
|
||||||
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
|
|
||||||
tableEnv.executeSql(insertInto).await();
|
|
||||||
|
|
||||||
// wait for the asynchronous commit to finish
|
|
||||||
TimeUnit.SECONDS.sleep(3);
|
|
||||||
|
|
||||||
// Make configuration and setAvroSchema.
|
|
||||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
||||||
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
|
|
||||||
cfg.path = tempFile.getAbsolutePath();
|
|
||||||
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
|
|
||||||
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
|
||||||
|
|
||||||
// create metaClient
|
|
||||||
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
|
|
||||||
|
|
||||||
// set the table name
|
|
||||||
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
|
|
||||||
|
|
||||||
// set table schema
|
|
||||||
CompactionUtil.setAvroSchema(conf, metaClient);
|
|
||||||
|
|
||||||
// judge whether have operation
|
|
||||||
// To compute the compaction instant time and do compaction.
|
|
||||||
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
|
||||||
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
|
||||||
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
|
||||||
|
|
||||||
assertTrue(scheduled, "The compaction plan should be scheduled");
|
|
||||||
|
|
||||||
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
|
||||||
// generate compaction plan
|
|
||||||
// should support configurable commit metadata
|
|
||||||
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
|
||||||
table.getMetaClient(), compactionInstantTime);
|
|
||||||
|
|
||||||
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
|
|
||||||
// Mark instant as compaction inflight
|
|
||||||
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
|
|
||||||
|
|
||||||
env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
|
|
||||||
.name("compaction_source")
|
|
||||||
.uid("uid_compaction_source")
|
|
||||||
.rebalance()
|
|
||||||
.transform("compact_task",
|
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
|
||||||
new ProcessOperator<>(new CompactFunction(conf)))
|
|
||||||
.setParallelism(compactionPlan.getOperations().size())
|
|
||||||
.addSink(new CompactionCommitSink(conf))
|
|
||||||
.name("clean_commits")
|
|
||||||
.uid("uid_clean_commits")
|
|
||||||
.setParallelism(1);
|
|
||||||
|
|
||||||
env.execute("flink_hudi_compaction");
|
|
||||||
TestData.checkWrittenFullData(tempFile, EXPECTED);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMergeOnReadWriteWithCompaction() throws Exception {
|
public void testMergeOnReadWriteWithCompaction() throws Exception {
|
||||||
int parallelism = 4;
|
int parallelism = 4;
|
||||||
|
|||||||
@@ -0,0 +1,146 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.sink.compact;
|
||||||
|
|
||||||
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
|
import org.apache.hudi.util.CompactionUtil;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
|
import org.apache.hudi.utils.TestData;
|
||||||
|
import org.apache.hudi.utils.TestSQL;
|
||||||
|
|
||||||
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||||
|
import org.apache.flink.table.api.EnvironmentSettings;
|
||||||
|
import org.apache.flink.table.api.TableEnvironment;
|
||||||
|
import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
||||||
|
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
||||||
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
|
||||||
|
*/
|
||||||
|
public class ITTestHoodieFlinkCompactor {
|
||||||
|
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
|
||||||
|
|
||||||
|
static {
|
||||||
|
EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
|
||||||
|
EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
|
||||||
|
EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
|
||||||
|
EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@TempDir
|
||||||
|
File tempFile;
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = {true, false})
|
||||||
|
public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
|
||||||
|
// Create hoodie table and insert into data.
|
||||||
|
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
|
||||||
|
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
|
||||||
|
tableEnv.getConfig().getConfiguration()
|
||||||
|
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
||||||
|
Map<String, String> options = new HashMap<>();
|
||||||
|
options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
|
||||||
|
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||||
|
options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
||||||
|
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
|
||||||
|
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||||
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
|
tableEnv.executeSql(TestSQL.INSERT_T1).await();
|
||||||
|
|
||||||
|
// wait for the asynchronous commit to finish
|
||||||
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
|
||||||
|
// Make configuration and setAvroSchema.
|
||||||
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
|
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
|
||||||
|
cfg.path = tempFile.getAbsolutePath();
|
||||||
|
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
|
||||||
|
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
|
||||||
|
|
||||||
|
// create metaClient
|
||||||
|
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
|
||||||
|
|
||||||
|
// set the table name
|
||||||
|
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
|
||||||
|
|
||||||
|
// set table schema
|
||||||
|
CompactionUtil.setAvroSchema(conf, metaClient);
|
||||||
|
|
||||||
|
// infer changelog mode
|
||||||
|
CompactionUtil.inferChangelogMode(conf, metaClient);
|
||||||
|
|
||||||
|
// judge whether have operation
|
||||||
|
// To compute the compaction instant time and do compaction.
|
||||||
|
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
|
||||||
|
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||||
|
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
|
||||||
|
|
||||||
|
assertTrue(scheduled, "The compaction plan should be scheduled");
|
||||||
|
|
||||||
|
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
|
||||||
|
// generate compaction plan
|
||||||
|
// should support configurable commit metadata
|
||||||
|
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
|
||||||
|
table.getMetaClient(), compactionInstantTime);
|
||||||
|
|
||||||
|
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
|
||||||
|
// Mark instant as compaction inflight
|
||||||
|
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
|
||||||
|
|
||||||
|
env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
|
||||||
|
.name("compaction_source")
|
||||||
|
.uid("uid_compaction_source")
|
||||||
|
.rebalance()
|
||||||
|
.transform("compact_task",
|
||||||
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
|
new ProcessOperator<>(new CompactFunction(conf)))
|
||||||
|
.setParallelism(compactionPlan.getOperations().size())
|
||||||
|
.addSink(new CompactionCommitSink(conf))
|
||||||
|
.name("clean_commits")
|
||||||
|
.uid("uid_clean_commits")
|
||||||
|
.setParallelism(1);
|
||||||
|
|
||||||
|
env.execute("flink_hudi_compaction");
|
||||||
|
TestData.checkWrittenFullData(tempFile, EXPECTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user