1
0

[HUDI-2165] Support Transformer for HoodieFlinkStreamer (#3270)

* [HUDI-2165] Support Transformer for HoodieFlinkStreamer
This commit is contained in:
vinoyang
2021-07-14 23:01:52 +08:00
committed by GitHub
parent 632bfd1a65
commit 52524b659d
6 changed files with 245 additions and 62 deletions

View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.transform;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import java.util.List;
import java.util.stream.Collectors;
/**
* A {@link Transformer} to chain other {@link Transformer}s and apply sequentially.
*/
public class ChainedTransformer implements Transformer {
private List<Transformer> transformers;
public ChainedTransformer(List<Transformer> transformers) {
this.transformers = transformers;
}
public List<String> getTransformersNames() {
return transformers.stream().map(t -> t.getClass().getName()).collect(Collectors.toList());
}
@Override
public DataStream<RowData> apply(DataStream<RowData> source) {
DataStream<RowData> dataStream = source;
for (Transformer t : transformers) {
dataStream = t.apply(dataStream);
}
return dataStream;
}
}

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.transform;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
/**
* Transform source stream to target stream before writing.
*/
public interface Transformer {
/**
* Transform source DataStream to target DataStream.
* @param source
*/
DataStream<RowData> apply(DataStream<RowData> source);
}

View File

@@ -117,6 +117,12 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.")
public Boolean commitOnErrors = false;
@Parameter(names = {"--transformer-class"},
description = "A subclass or a list of subclasses of org.apache.hudi.sink.transform.Transformer"
+ ". Allows transforming raw source DataStream to a target DataStream (conforming to target schema) before "
+ "writing. Default : Not set. Pass a comma-separated list of subclass names to chain the transformations.")
public List<String> transformerClassNames = null;
/**
* Flink checkpoint interval.
*/

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.streamer;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
@@ -30,6 +31,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
@@ -44,6 +46,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -85,7 +88,7 @@ public class HoodieFlinkStreamer {
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
DataStream<HoodieRecord> hoodieDataStream = env.addSource(new FlinkKafkaConsumer<>(
DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
@@ -95,8 +98,17 @@ public class HoodieFlinkStreamer {
TimestampFormat.ISO_8601
), kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source")
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
.uid("uid_kafka_source");
if (cfg.transformerClassNames != null && !cfg.transformerClassNames.isEmpty()) {
Option<Transformer> transformer = StreamerUtil.createTransformer(cfg.transformerClassNames);
if (transformer.isPresent()) {
dataStream = transformer.get().apply(dataStream);
}
}
DataStream<HoodieRecord> hoodieDataStream = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance()
.transform(

View File

@@ -28,6 +28,8 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -37,6 +39,8 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
@@ -55,6 +59,8 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
@@ -305,4 +311,17 @@ public class StreamerUtil {
throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e);
}
}
public static Option<Transformer> createTransformer(List<String> classNames) throws IOException {
try {
List<Transformer> transformers = new ArrayList<>();
for (String className : Option.ofNullable(classNames).orElse(Collections.emptyList())) {
transformers.add(ReflectionUtils.loadClass(className));
}
return transformers.isEmpty() ? Option.empty() : Option.of(new ChainedTransformer(transformers));
} catch (Throwable e) {
throw new IOException("Could not load transformer class(es) " + classNames, e);
}
}
}

View File

@@ -39,6 +39,8 @@ import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
@@ -66,13 +68,13 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -88,81 +90,67 @@ import java.util.concurrent.TimeUnit;
*/
public class StreamWriteITCase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(StreamWriteITCase.class);
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap<>();
private static final Map<String, List<String>> EXPECTED_CHAINED_TRANSFORMER = new HashMap<>();
static {
EXPECTED.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1"));
EXPECTED.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2"));
EXPECTED.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3"));
EXPECTED.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4"));
EXPECTED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,24,1000,par1", "id2,par1,id2,Stephen,34,2000,par1"));
EXPECTED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2"));
EXPECTED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,19,5000,par3", "id6,par3,id6,Emma,21,6000,par3"));
EXPECTED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,45,7000,par4", "id8,par4,id8,Han,57,8000,par4"));
EXPECTED_CHAINED_TRANSFORMER.put("par1", Arrays.asList("id1,par1,id1,Danny,25,1000,par1", "id2,par1,id2,Stephen,35,2000,par1"));
EXPECTED_CHAINED_TRANSFORMER.put("par2", Arrays.asList("id3,par2,id3,Julian,55,3000,par2", "id4,par2,id4,Fabian,33,4000,par2"));
EXPECTED_CHAINED_TRANSFORMER.put("par3", Arrays.asList("id5,par3,id5,Sophia,20,5000,par3", "id6,par3,id6,Emma,22,6000,par3"));
EXPECTED_CHAINED_TRANSFORMER.put("par4", Arrays.asList("id7,par4,id7,Bob,46,7000,par4", "id8,par4,id8,Han,58,8000,par4"));
}
@TempDir
File tempFile;
@Test
public void testWriteToHoodie() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(4);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
public void testTransformerBeforeWriting() throws Exception {
Transformer transformer = (ds) -> ds.map((rowdata) -> {
if (rowdata instanceof GenericRowData) {
GenericRowData genericRD = (GenericRowData) rowdata;
//update age field to age + 1
genericRD.setField(2, genericRD.getInt(2) + 1);
return genericRD;
} else {
throw new RuntimeException("Unrecognized row type information: " + rowdata.getClass().getSimpleName());
}
});
// Read from file source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
testWriteToHoodie(transformer, EXPECTED_TRANSFORMER);
}
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
);
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
@Test
public void testChainedTransformersBeforeWriting() throws Exception {
Transformer t1 = (ds) -> ds.map((rowdata) -> {
if (rowdata instanceof GenericRowData) {
GenericRowData genericRD = (GenericRowData) rowdata;
//update age field to age + 1
genericRD.setField(2, genericRD.getInt(2) + 1);
return genericRD;
} else {
throw new RuntimeException("Unrecognized row type : " + rowdata.getClass().getSimpleName());
}
});
DataStream<HoodieRecord> hoodieDataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
ChainedTransformer chainedTransformer = new ChainedTransformer(Arrays.asList(t1, t1));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
}
testWriteToHoodie(chainedTransformer, EXPECTED_CHAINED_TRANSFORMER);
}
DataStream<Object> pipeline = hoodieDataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner")
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write");
execEnv.addOperator(pipeline.getTransformation());
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
// wait for the streaming job to finish
client.getJobExecutionResult().get();
TestData.checkWrittenFullData(tempFile, EXPECTED);
@Test
public void testWriteToHoodieWithoutTransformer() throws Exception {
testWriteToHoodie(null, EXPECTED);
}
@Test
@@ -328,4 +316,76 @@ public class StreamWriteITCase extends TestLogger {
TestData.checkWrittenFullData(tempFile, EXPECTED);
}
private void testWriteToHoodie(
Transformer transformer,
Map<String, List<String>> expected) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(4);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Read from file source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
);
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
DataStream<RowData> dataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4);
if (transformer != null) {
dataStream = transformer.apply(dataStream);
}
DataStream<HoodieRecord> hoodieDataStream = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
}
DataStream<Object> pipeline = hoodieDataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner")
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write");
execEnv.addOperator(pipeline.getTransformation());
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
// wait for the streaming job to finish
client.getJobExecutionResult().get();
TestData.checkWrittenFullData(tempFile, expected);
}
}