diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 70b820c86..51e3e2738 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -149,7 +149,7 @@ public class TableSchemaResolver {
}
}
- private Schema getTableAvroSchemaFromDataFile() throws Exception {
+ public Schema getTableAvroSchemaFromDataFile() throws Exception {
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 18d49f1be..b23168e2d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -66,6 +66,9 @@ public class HoodieFlinkCompactor {
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);
+ // infer changelog mode
+ CompactionUtil.inferChangelogMode(conf, metaClient);
+
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
HoodieFlinkTable> table = writeClient.getHoodieTable();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 7d319203d..72d8dd6e2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import scala.Serializable;
+
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
@@ -66,7 +68,10 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_O
*
use the file paths from #step 3 as the back-up of the filesystem view.
*
*/
-public class IncrementalInputSplits {
+public class IncrementalInputSplits implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
private final Configuration conf;
private final Path path;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index a0de79d91..2d4a0c884 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -19,6 +19,7 @@
package org.apache.hudi.util;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -76,6 +77,21 @@ public class CompactionUtil {
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
}
+ /**
+ * Infers the changelog mode based on the data file schema(including metadata fields).
+ *
+ * We can improve the code if the changelog mode is set up as table config.
+ *
+ * @param conf The configuration
+ */
+ public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
+ TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
+ Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
+ if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) {
+ conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);
+ }
+ }
+
/**
* Cleans the metadata file for given instant {@code instant}.
*/
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 1890d07d2..2c8fb490a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -18,27 +18,13 @@
package org.apache.hudi.sink;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.compact.CompactFunction;
-import org.apache.hudi.sink.compact.CompactionCommitEvent;
-import org.apache.hudi.sink.compact.CompactionCommitSink;
-import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
-import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
-import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
-import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -58,11 +44,6 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -80,8 +61,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
/**
* Integration test for Flink Hoodie stream sink.
*/
@@ -150,84 +129,6 @@ public class StreamWriteITCase extends TestLogger {
testWriteToHoodie(null, EXPECTED);
}
- @Test
- public void testHoodieFlinkCompactor() throws Exception {
- // Create hoodie table and insert into data.
- EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
- TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
- tableEnv.getConfig().getConfiguration()
- .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- Map options = new HashMap<>();
- options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
- options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
- options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
- String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
- tableEnv.executeSql(hoodieTableDDL);
- String insertInto = "insert into t1 values\n"
- + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
- + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
- + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
- + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
- + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
- + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
- + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
- + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
- tableEnv.executeSql(insertInto).await();
-
- // wait for the asynchronous commit to finish
- TimeUnit.SECONDS.sleep(3);
-
- // Make configuration and setAvroSchema.
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- FlinkCompactionConfig cfg = new FlinkCompactionConfig();
- cfg.path = tempFile.getAbsolutePath();
- Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
- conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
-
- // create metaClient
- HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-
- // set the table name
- conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
-
- // set table schema
- CompactionUtil.setAvroSchema(conf, metaClient);
-
- // judge whether have operation
- // To compute the compaction instant time and do compaction.
- String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
- HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
- boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
-
- assertTrue(scheduled, "The compaction plan should be scheduled");
-
- HoodieFlinkTable> table = writeClient.getHoodieTable();
- // generate compaction plan
- // should support configurable commit metadata
- HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
- table.getMetaClient(), compactionInstantTime);
-
- HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
- // Mark instant as compaction inflight
- table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
-
- env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
- .name("compaction_source")
- .uid("uid_compaction_source")
- .rebalance()
- .transform("compact_task",
- TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
- .setParallelism(compactionPlan.getOperations().size())
- .addSink(new CompactionCommitSink(conf))
- .name("clean_commits")
- .uid("uid_clean_commits")
- .setParallelism(1);
-
- env.execute("flink_hudi_compaction");
- TestData.checkWrittenFullData(tempFile, EXPECTED);
- }
-
@Test
public void testMergeOnReadWriteWithCompaction() throws Exception {
int parallelism = 4;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
new file mode 100644
index 000000000..8571e8f1b
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
+ */
+public class ITTestHoodieFlinkCompactor {
+ private static final Map> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
+ EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
+ EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
+ EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
+ }
+
+ @TempDir
+ File tempFile;
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
+ // Create hoodie table and insert into data.
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ Map options = new HashMap<>();
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+ tableEnv.executeSql(hoodieTableDDL);
+ tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+ // wait for the asynchronous commit to finish
+ TimeUnit.SECONDS.sleep(3);
+
+ // Make configuration and setAvroSchema.
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // infer changelog mode
+ CompactionUtil.inferChangelogMode(conf, metaClient);
+
+ // judge whether have operation
+ // To compute the compaction instant time and do compaction.
+ String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
+ HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
+ boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+
+ assertTrue(scheduled, "The compaction plan should be scheduled");
+
+ HoodieFlinkTable> table = writeClient.getHoodieTable();
+ // generate compaction plan
+ // should support configurable commit metadata
+ HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+ table.getMetaClient(), compactionInstantTime);
+
+ HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+ // Mark instant as compaction inflight
+ table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+
+ env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
+ .name("compaction_source")
+ .uid("uid_compaction_source")
+ .rebalance()
+ .transform("compact_task",
+ TypeInformation.of(CompactionCommitEvent.class),
+ new ProcessOperator<>(new CompactFunction(conf)))
+ .setParallelism(compactionPlan.getOperations().size())
+ .addSink(new CompactionCommitSink(conf))
+ .name("clean_commits")
+ .uid("uid_clean_commits")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_compaction");
+ TestData.checkWrittenFullData(tempFile, EXPECTED);
+ }
+}