From 52524b659d2cb64403e8ba87d2fefe6d536156e9 Mon Sep 17 00:00:00 2001 From: vinoyang Date: Wed, 14 Jul 2021 23:01:52 +0800 Subject: [PATCH] [HUDI-2165] Support Transformer for HoodieFlinkStreamer (#3270) * [HUDI-2165] Support Transformer for HoodieFlinkStreamer --- .../sink/transform/ChainedTransformer.java | 51 +++++ .../hudi/sink/transform/Transformer.java | 35 ++++ .../hudi/streamer/FlinkStreamerConfig.java | 6 + .../hudi/streamer/HoodieFlinkStreamer.java | 18 +- .../org/apache/hudi/util/StreamerUtil.java | 19 ++ .../apache/hudi/sink/StreamWriteITCase.java | 178 ++++++++++++------ 6 files changed, 245 insertions(+), 62 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java new file mode 100644 index 000000000..2fe2867b7 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/ChainedTransformer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.transform; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.data.RowData; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * A {@link Transformer} to chain other {@link Transformer}s and apply sequentially. + */ +public class ChainedTransformer implements Transformer { + + private List transformers; + + public ChainedTransformer(List transformers) { + this.transformers = transformers; + } + + public List getTransformersNames() { + return transformers.stream().map(t -> t.getClass().getName()).collect(Collectors.toList()); + } + + @Override + public DataStream apply(DataStream source) { + DataStream dataStream = source; + for (Transformer t : transformers) { + dataStream = t.apply(dataStream); + } + + return dataStream; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java new file mode 100644 index 000000000..f40a838b0 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.transform; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.data.RowData; + +/** + * Transform source stream to target stream before writing. + */ +public interface Transformer { + + /** + * Transform source DataStream to target DataStream. + * @param source + */ + DataStream apply(DataStream source); + +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index d81fd3d39..8229ffaef 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -117,6 +117,12 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.") public Boolean commitOnErrors = false; + @Parameter(names = {"--transformer-class"}, + description = "A subclass or a list of subclasses of org.apache.hudi.sink.transform.Transformer" + + ". Allows transforming raw source DataStream to a target DataStream (conforming to target schema) before " + + "writing. Default : Not set. Pass a comma-separated list of subclass names to chain the transformations.") + public List transformerClassNames = null; + /** * Flink checkpoint interval. */ diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index f8cf84085..20cd833af 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -19,6 +19,7 @@ package org.apache.hudi.streamer; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; @@ -30,6 +31,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; +import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; @@ -44,6 +46,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -85,7 +88,7 @@ public class HoodieFlinkStreamer { StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); - DataStream hoodieDataStream = env.addSource(new FlinkKafkaConsumer<>( + DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, new JsonRowDataDeserializationSchema( rowType, @@ -95,8 +98,17 @@ public class HoodieFlinkStreamer { TimestampFormat.ISO_8601 ), kafkaProps)) .name("kafka_source") - .uid("uid_kafka_source") - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + .uid("uid_kafka_source"); + + if (cfg.transformerClassNames != null && !cfg.transformerClassNames.isEmpty()) { + Option transformer = StreamerUtil.createTransformer(cfg.transformerClassNames); + if (transformer.isPresent()) { + dataStream = transformer.get().apply(dataStream); + } + } + + DataStream hoodieDataStream = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { hoodieDataStream = hoodieDataStream.rebalance() .transform( diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 9cbfe4f41..9b3c7ac15 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -28,6 +28,8 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; @@ -37,6 +39,8 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.schema.FilebasedSchemaProvider; +import org.apache.hudi.sink.transform.Transformer; +import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; @@ -55,6 +59,8 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Locale; @@ -305,4 +311,17 @@ public class StreamerUtil { throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e); } } + + public static Option createTransformer(List classNames) throws IOException { + try { + List transformers = new ArrayList<>(); + for (String className : Option.ofNullable(classNames).orElse(Collections.emptyList())) { + transformers.add(ReflectionUtils.loadClass(className)); + } + return transformers.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(transformers)); + } catch (Throwable e) { + throw new IOException("Could not load transformer class(es) " + classNames, e); + } + } + } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index be4c05219..6d802fefd 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -39,6 +39,8 @@ import org.apache.hudi.sink.compact.FlinkCompactionConfig; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; +import org.apache.hudi.sink.transform.Transformer; +import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.CompactionUtil; @@ -66,13 +68,13 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.nio.charset.StandardCharsets; @@ -88,81 +90,67 @@ import java.util.concurrent.TimeUnit; */ public class StreamWriteITCase extends TestLogger { - protected static final Logger LOG = LoggerFactory.getLogger(StreamWriteITCase.class); - private static final Map> EXPECTED = new HashMap<>(); + private static final Map> EXPECTED_TRANSFORMER = new HashMap<>(); + private static final Map> EXPECTED_CHAINED_TRANSFORMER = new HashMap<>(); static { EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3")); EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4")); + + EXPECTED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1")); + EXPECTED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2")); + EXPECTED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,19,5000,par3", "id6,par3,id6,Emma,21,6000,par3")); + EXPECTED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,45,7000,par4", "id8,par4,id8,Han,57,8000,par4")); + + EXPECTED_CHAINED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,25,1000,par1", "id2,par1,id2,Stephen,35,2000,par1")); + EXPECTED_CHAINED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,55,3000,par2", "id4,par2,id4,Fabian,33,4000,par2")); + EXPECTED_CHAINED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,20,5000,par3", "id6,par3,id6,Emma,22,6000,par3")); + EXPECTED_CHAINED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,46,7000,par4", "id8,par4,id8,Han,58,8000,par4")); } @TempDir File tempFile; @Test - public void testWriteToHoodie() throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.getConfig().disableObjectReuse(); - execEnv.setParallelism(4); - // set up checkpoint interval - execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); - execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + public void testTransformerBeforeWriting() throws Exception { + Transformer transformer = (ds) -> ds.map((rowdata) -> { + if (rowdata instanceof GenericRowData) { + GenericRowData genericRD = (GenericRowData) rowdata; + //update age field to age + 1 + genericRD.setField(2, genericRD.getInt(2) + 1); + return genericRD; + } else { + throw new RuntimeException("Unrecognized row type information: " + rowdata.getClass().getSimpleName()); + } + }); - // Read from file source - RowType rowType = - (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) - .getLogicalType(); - StreamWriteOperatorFactory operatorFactory = - new StreamWriteOperatorFactory<>(conf); + testWriteToHoodie(transformer, EXPECTED_TRANSFORMER); + } - JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( - rowType, - InternalTypeInfo.of(rowType), - false, - true, - TimestampFormat.ISO_8601 - ); - String sourcePath = Objects.requireNonNull(Thread.currentThread() - .getContextClassLoader().getResource("test_source.data")).toString(); + @Test + public void testChainedTransformersBeforeWriting() throws Exception { + Transformer t1 = (ds) -> ds.map((rowdata) -> { + if (rowdata instanceof GenericRowData) { + GenericRowData genericRD = (GenericRowData) rowdata; + //update age field to age + 1 + genericRD.setField(2, genericRD.getInt(2) + 1); + return genericRD; + } else { + throw new RuntimeException("Unrecognized row type : " + rowdata.getClass().getSimpleName()); + } + }); - DataStream hoodieDataStream = execEnv - // use continuous file source to trigger checkpoint - .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)) - .name("continuous_file_source") - .setParallelism(1) - .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) - .setParallelism(4) - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + ChainedTransformer chainedTransformer = new ChainedTransformer(Arrays.asList(t1, t1)); - if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); - } + testWriteToHoodie(chainedTransformer, EXPECTED_CHAINED_TRANSFORMER); + } - DataStream pipeline = hoodieDataStream - // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(HoodieRecord::getRecordKey) - .transform( - "bucket_assigner", - TypeInformation.of(HoodieRecord.class), - new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") - // shuffle by fileId(bucket id) - .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write"); - execEnv.addOperator(pipeline.getTransformation()); - - JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); - // wait for the streaming job to finish - client.getJobExecutionResult().get(); - - TestData.checkWrittenFullData(tempFile, EXPECTED); + @Test + public void testWriteToHoodieWithoutTransformer() throws Exception { + testWriteToHoodie(null, EXPECTED); } @Test @@ -328,4 +316,76 @@ public class StreamWriteITCase extends TestLogger { TestData.checkWrittenFullData(tempFile, EXPECTED); } + + private void testWriteToHoodie( + Transformer transformer, + Map> expected) throws Exception { + + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + StreamWriteOperatorFactory operatorFactory = + new StreamWriteOperatorFactory<>(conf); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + DataStream dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(4); + + if (transformer != null) { + dataStream = transformer.apply(dataStream); + } + + DataStream hoodieDataStream = dataStream + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))); + } + + DataStream pipeline = hoodieDataStream + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) + .uid("uid_bucket_assigner") + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) + .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_hoodie_stream_write"); + execEnv.addOperator(pipeline.getTransformation()); + + JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); + // wait for the streaming job to finish + client.getJobExecutionResult().get(); + + TestData.checkWrittenFullData(tempFile, expected); + + } }