From c7d962efffa9440746d54dd2bcba73a9df8b354a Mon Sep 17 00:00:00 2001 From: wangxianghu Date: Wed, 14 Oct 2020 09:30:49 +0800 Subject: [PATCH] [HUDI-1328] Introduce HoodieFlinkEngineContext to hudi-flink-client (#2161) --- hudi-client/hudi-flink-client/pom.xml | 220 ++++++++++++++++++ .../common/HoodieFlinkEngineContext.java | 89 +++++++ .../common/function/FunctionWrapper.java | 73 ++++++ .../src/main/resources/log4j.properties | 23 ++ .../common/TestHoodieFlinkEngineContext.java | 115 +++++++++ .../test/resources/log4j-surefire.properties | 31 +++ hudi-client/pom.xml | 1 + 7 files changed, 552 insertions(+) create mode 100644 hudi-client/hudi-flink-client/pom.xml create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java create mode 100644 hudi-client/hudi-flink-client/src/main/resources/log4j.properties create mode 100644 hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java create mode 100644 hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml new file mode 100644 index 000000000..6fe6482a0 --- /dev/null +++ b/hudi-client/hudi-flink-client/pom.xml @@ -0,0 +1,220 @@ + + + + + hudi-client + org.apache.hudi + 0.6.1-SNAPSHOT + + 4.0.0 + + hudi-flink-client + ${parent.version} + + hudi-flink-client + jar + + + + + org.apache.hudi + hudi-client-common + ${parent.version} + + + + + org.apache.parquet + parquet-avro + + + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-hadoop-mr + ${project.version} + test + + + + + org.apache.hbase + hbase-testing-util + ${hbase.version} + test + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + javax.xml.bind + * + + + + + + + ${hive.groupid} + hive-exec + ${hive.version} + test + ${hive.exec.classifier} + + + ${hive.groupid} + hive-metastore + ${hive.version} + test + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.mockito + mockito-junit-jupiter + test + + + org.junit.platform + junit-platform-runner + test + + + org.junit.platform + junit-platform-suite-api + test + + + org.junit.platform + junit-platform-commons + test + + + + + + + org.jacoco + jacoco-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + + + + src/main/resources + + + src/test/resources + + + + diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java new file mode 100644 index 000000000..5c52b5814 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.common.function.SerializableConsumer; +import org.apache.hudi.client.common.function.SerializableFunction; +import org.apache.hudi.client.common.function.SerializablePairFunction; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.util.Option; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper; +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper; +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper; +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * A flink engine implementation of HoodieEngineContext. + */ +public class HoodieFlinkEngineContext extends HoodieEngineContext { + + public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) { + this(new SerializableConfiguration(new Configuration()), taskContextSupplier); + } + + public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf, TaskContextSupplier taskContextSupplier) { + super(hadoopConf, taskContextSupplier); + } + + @Override + public List map(List data, SerializableFunction func, int parallelism) { + return data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList()); + } + + @Override + public List flatMap(List data, SerializableFunction> func, int parallelism) { + return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList()); + } + + @Override + public void foreach(List data, SerializableConsumer consumer, int parallelism) { + data.forEach(throwingForeachWrapper(consumer)); + } + + @Override + public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { + Map map = new HashMap<>(); + data.stream().map(throwingMapToPairWrapper(func)).forEach(x -> map.put(x._1, x._2)); + return map; + } + + @Override + public void setProperty(EngineProperty key, String value) { + // no operation for now + } + + @Override + public Option getProperty(EngineProperty key) { + // no operation for now + return Option.empty(); + } + + @Override + public void setJobStatus(String activeModule, String activityDescription) { + // no operation for now + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java new file mode 100644 index 000000000..31b11a0d1 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.common.function; + +import org.apache.hudi.exception.HoodieException; +import scala.Tuple2; + +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * Function wrapper util class, which catches the exception thrown by input function and return a similar function + * with no exception thrown. + */ +public class FunctionWrapper { + + public static Function throwingMapWrapper(SerializableFunction throwingMapFunction) { + return v1 -> { + try { + return throwingMapFunction.apply(v1); + } catch (Exception e) { + throw new HoodieException("Error occurs when executing map", e); + } + }; + } + + public static Function> throwingFlatMapWrapper(SerializableFunction> throwingFlatMapFunction) { + return v1 -> { + try { + return throwingFlatMapFunction.apply(v1); + } catch (Exception e) { + throw new HoodieException("Error occurs when executing flatMap", e); + } + }; + } + + public static Consumer throwingForeachWrapper(SerializableConsumer throwingConsumer) { + return v1 -> { + try { + throwingConsumer.accept(v1); + } catch (Exception e) { + throw new HoodieException("Error occurs when executing foreach", e); + } + }; + } + + public static Function> throwingMapToPairWrapper(SerializablePairFunction throwingPairFunction) { + return v1 -> { + try { + return throwingPairFunction.call(v1); + } catch (Exception e) { + throw new HoodieException("Error occurs when executing mapToPair", e); + } + }; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/resources/log4j.properties b/hudi-client/hudi-flink-client/src/main/resources/log4j.properties new file mode 100644 index 000000000..ff268faf6 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=INFO, A1 +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java new file mode 100644 index 000000000..5d752ada0 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/common/TestHoodieFlinkEngineContext.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.common; + +import org.apache.hudi.common.util.Option; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Unit test against HoodieFlinkEngineContext. + */ +public class TestHoodieFlinkEngineContext { + private HoodieFlinkEngineContext context; + + @BeforeEach + public void init() { + context = new HoodieFlinkEngineContext(new DummyTaskContextSupplier()); + } + + @Test + public void testMap() { + List mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = context.map(mapList, x -> x + 1, 2); + result.removeAll(mapList); + + Assertions.assertEquals(1, result.size()); + Assertions.assertEquals(11, result.get(0)); + } + + @Test + public void testFlatMap() { + List list1 = Arrays.asList("a", "b", "c"); + List list2 = Arrays.asList("d", "e", "f"); + List list3 = Arrays.asList("g", "h", "i"); + + List> inputList = new ArrayList<>(); + inputList.add(list1); + inputList.add(list2); + inputList.add(list3); + + List result = context.flatMap(inputList, Collection::stream, 2); + + Assertions.assertEquals(9, result.size()); + } + + @Test + public void testForeach() { + List mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = new ArrayList<>(10); + context.foreach(mapList, result::add, 2); + + Assertions.assertEquals(result.size(), mapList.size()); + Assertions.assertTrue(result.containsAll(mapList)); + } + + @Test + public void testMapToPair() { + List mapList = Arrays.asList("hudi_flink", "hudi_spark"); + + Map resultMap = context.mapToPair(mapList, x -> { + String[] splits = x.split("_"); + return Tuple2.apply(splits[0], splits[1]); + }, 2); + + Assertions.assertNotNull(resultMap.get("hudi")); + } + + public static class DummyTaskContextSupplier extends TaskContextSupplier { + + @Override + public Supplier getPartitionIdSupplier() { + return null; + } + + @Override + public Supplier getStageIdSupplier() { + return null; + } + + @Override + public Supplier getAttemptIdSupplier() { + return null; + } + + @Override + public Option getProperty(EngineProperty prop) { + return null; + } + } +} diff --git a/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties b/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties new file mode 100644 index 000000000..32af46209 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/test/resources/log4j-surefire.properties @@ -0,0 +1,31 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 1ab04793a..e8ff9e905 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -33,5 +33,6 @@ hudi-client-common hudi-spark-client + hudi-flink-client