diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml
new file mode 100644
index 000000000..6fe6482a0
--- /dev/null
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -0,0 +1,220 @@
+
+
+
+
+ hudi-client
+ org.apache.hudi
+ 0.6.1-SNAPSHOT
+
+ 4.0.0
+
+ hudi-flink-client
+ ${parent.version}
+
+ hudi-flink-client
+ jar
+
+
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${parent.version}
+
+
+
+
+ org.apache.parquet
+ parquet-avro
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.hudi
+ hudi-hadoop-mr
+ ${project.version}
+ test
+
+
+
+
+ org.apache.hbase
+ hbase-testing-util
+ ${hbase.version}
+ test
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ javax.xml.bind
+ *
+
+
+
+
+
+
+ ${hive.groupid}
+ hive-exec
+ ${hive.version}
+ test
+ ${hive.exec.classifier}
+
+
+ ${hive.groupid}
+ hive-metastore
+ ${hive.version}
+ test
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
+ org.junit.platform
+ junit-platform-runner
+ test
+
+
+ org.junit.platform
+ junit-platform-suite-api
+ test
+
+
+ org.junit.platform
+ junit-platform-commons
+ test
+
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ compile
+
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+ test-compile
+
+
+
+ false
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+
+
+
+ src/main/resources
+
+
+ src/test/resources
+
+
+
+
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
new file mode 100644
index 000000000..5c52b5814
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.common.function.SerializableConsumer;
+import org.apache.hudi.client.common.function.SerializableFunction;
+import org.apache.hudi.client.common.function.SerializablePairFunction;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.util.Option;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper;
+import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapWrapper;
+
+/**
+ * A flink engine implementation of HoodieEngineContext.
+ */
+public class HoodieFlinkEngineContext extends HoodieEngineContext {
+
+ public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
+ this(new SerializableConfiguration(new Configuration()), taskContextSupplier);
+ }
+
+ public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf, TaskContextSupplier taskContextSupplier) {
+ super(hadoopConf, taskContextSupplier);
+ }
+
+ @Override
+ public List map(List data, SerializableFunction func, int parallelism) {
+ return data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList());
+ }
+
+ @Override
+ public List flatMap(List data, SerializableFunction> func, int parallelism) {
+ return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
+ }
+
+ @Override
+ public void foreach(List data, SerializableConsumer consumer, int parallelism) {
+ data.forEach(throwingForeachWrapper(consumer));
+ }
+
+ @Override
+ public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) {
+ Map map = new HashMap<>();
+ data.stream().map(throwingMapToPairWrapper(func)).forEach(x -> map.put(x._1, x._2));
+ return map;
+ }
+
+ @Override
+ public void setProperty(EngineProperty key, String value) {
+ // no operation for now
+ }
+
+ @Override
+ public Option getProperty(EngineProperty key) {
+ // no operation for now
+ return Option.empty();
+ }
+
+ @Override
+ public void setJobStatus(String activeModule, String activityDescription) {
+ // no operation for now
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java
new file mode 100644
index 000000000..31b11a0d1
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common.function;
+
+import org.apache.hudi.exception.HoodieException;
+import scala.Tuple2;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * Function wrapper util class, which catches the exception thrown by input function and return a similar function
+ * with no exception thrown.
+ */
+public class FunctionWrapper {
+
+ public static Function throwingMapWrapper(SerializableFunction throwingMapFunction) {
+ return v1 -> {
+ try {
+ return throwingMapFunction.apply(v1);
+ } catch (Exception e) {
+ throw new HoodieException("Error occurs when executing map", e);
+ }
+ };
+ }
+
+ public static Function> throwingFlatMapWrapper(SerializableFunction> throwingFlatMapFunction) {
+ return v1 -> {
+ try {
+ return throwingFlatMapFunction.apply(v1);
+ } catch (Exception e) {
+ throw new HoodieException("Error occurs when executing flatMap", e);
+ }
+ };
+ }
+
+ public static Consumer throwingForeachWrapper(SerializableConsumer throwingConsumer) {
+ return v1 -> {
+ try {
+ throwingConsumer.accept(v1);
+ } catch (Exception e) {
+ throw new HoodieException("Error occurs when executing foreach", e);
+ }
+ };
+ }
+
+ public static Function> throwingMapToPairWrapper(SerializablePairFunction throwingPairFunction) {
+ return v1 -> {
+ try {
+ return throwingPairFunction.call(v1);
+ } catch (Exception e) {
+ throw new HoodieException("Error occurs when executing mapToPair", e);
+ }
+ };
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/resources/log4j.properties b/hudi-client/hudi-flink-client/src/main/resources/log4j.properties
new file mode 100644
index 000000000..ff268faf6
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=INFO, A1
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java
new file mode 100644
index 000000000..5d752ada0
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.common;
+
+import org.apache.hudi.common.util.Option;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * Unit test against HoodieFlinkEngineContext.
+ */
+public class TestHoodieFlinkEngineContext {
+ private HoodieFlinkEngineContext context;
+
+ @BeforeEach
+ public void init() {
+ context = new HoodieFlinkEngineContext(new DummyTaskContextSupplier());
+ }
+
+ @Test
+ public void testMap() {
+ List mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ List result = context.map(mapList, x -> x + 1, 2);
+ result.removeAll(mapList);
+
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertEquals(11, result.get(0));
+ }
+
+ @Test
+ public void testFlatMap() {
+ List list1 = Arrays.asList("a", "b", "c");
+ List list2 = Arrays.asList("d", "e", "f");
+ List list3 = Arrays.asList("g", "h", "i");
+
+ List> inputList = new ArrayList<>();
+ inputList.add(list1);
+ inputList.add(list2);
+ inputList.add(list3);
+
+ List result = context.flatMap(inputList, Collection::stream, 2);
+
+ Assertions.assertEquals(9, result.size());
+ }
+
+ @Test
+ public void testForeach() {
+ List mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ List result = new ArrayList<>(10);
+ context.foreach(mapList, result::add, 2);
+
+ Assertions.assertEquals(result.size(), mapList.size());
+ Assertions.assertTrue(result.containsAll(mapList));
+ }
+
+ @Test
+ public void testMapToPair() {
+ List mapList = Arrays.asList("hudi_flink", "hudi_spark");
+
+ Map resultMap = context.mapToPair(mapList, x -> {
+ String[] splits = x.split("_");
+ return Tuple2.apply(splits[0], splits[1]);
+ }, 2);
+
+ Assertions.assertNotNull(resultMap.get("hudi"));
+ }
+
+ public static class DummyTaskContextSupplier extends TaskContextSupplier {
+
+ @Override
+ public Supplier getPartitionIdSupplier() {
+ return null;
+ }
+
+ @Override
+ public Supplier getStageIdSupplier() {
+ return null;
+ }
+
+ @Override
+ public Supplier getAttemptIdSupplier() {
+ return null;
+ }
+
+ @Override
+ public Option getProperty(EngineProperty prop) {
+ return null;
+ }
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties b/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties
new file mode 100644
index 000000000..32af46209
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties
@@ -0,0 +1,31 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache=INFO
+log4j.logger.org.apache.hudi=DEBUG
+log4j.logger.org.apache.hadoop.hbase=ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 1ab04793a..e8ff9e905 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -33,5 +33,6 @@
hudi-client-common
hudi-spark-client
+ hudi-flink-client