diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 5c76b4c99..4dc0604dd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -43,6 +43,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException; import org.apache.hudi.utilities.schema.ChainedSchemaPostProcessor; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; @@ -53,6 +54,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.sources.Source; +import org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor; import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -125,12 +127,19 @@ public class UtilHelpers { } } - public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(String postProcessorClassName, TypedProperties props) throws IOException { + public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(String postProcessorClassNames, TypedProperties props) throws IOException { + if (StringUtils.isNullOrEmpty(postProcessorClassNames)) { + return null; + } + try { - return StringUtils.isNullOrEmpty(postProcessorClassName) ? null - : (JsonKafkaSourcePostProcessor) ReflectionUtils.loadClass(postProcessorClassName, props); + List processors = new ArrayList<>(); + for (String className : (postProcessorClassNames.split(","))) { + processors.add((JsonKafkaSourcePostProcessor) ReflectionUtils.loadClass(className, props)); + } + return new ChainedJsonKafkaSourcePostProcessor(processors, props); } catch (Throwable e) { - throw new IOException("Could not load json kafka source post processor class " + postProcessorClassName, e); + throw new HoodieSourcePostProcessException("Could not load postProcessorClassNames class(es) " + postProcessorClassNames, e); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/ChainedJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/ChainedJsonKafkaSourcePostProcessor.java new file mode 100644 index 000000000..827325a0b --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/ChainedJsonKafkaSourcePostProcessor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.processor; + +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.List; + +/** + * A {@link JsonKafkaSourcePostProcessor} to chain other {@link JsonKafkaSourcePostProcessor}s and apply sequentially. + */ +public class ChainedJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor { + + private final List processors; + + public ChainedJsonKafkaSourcePostProcessor(List processors, TypedProperties props) { + super(props); + this.processors = processors; + } + + @Override + public JavaRDD process(JavaRDD inputJsonRecords) { + JavaRDD targetRDD = inputJsonRecords; + for (JsonKafkaSourcePostProcessor processor : processors) { + targetRDD = processor.process(targetRDD); + } + return targetRDD; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java index b53564df3..bd150ed29 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java @@ -30,6 +30,8 @@ import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Objects; + import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -96,6 +98,28 @@ public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource { () -> kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900)); } + @Test + public void testChainedJsonKafkaSourcePostProcessor() { + // topic setup. + final String topic = TEST_TOPIC_PREFIX + "testChainedJsonKafkaSourcePostProcessor"; + testUtils.createTopic(topic, 2); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(topic, null, "earliest"); + + // processor class name setup + props.setProperty(JSON_KAFKA_PROCESSOR_CLASS_OPT.key(), SampleJsonKafkaSourcePostProcessor.class.getName() + + "," + DummyJsonKafkaSourcePostProcessor.class.getName()); + + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); + + assertEquals(0, fetch1.getBatch().get().count()); + } + /** * JsonKafkaSourcePostProcessor that return a sub RDD of the incoming data which get the data from incoming data using * {org.apache.spark.api.java.JavaRDD#sample(boolean, double, long)} method. @@ -112,4 +136,16 @@ public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource { } } + public static class DummyJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor { + public DummyJsonKafkaSourcePostProcessor(TypedProperties props) { + super(props); + } + + @Override + public JavaRDD process(JavaRDD inputJsonRecords) { + // return empty RDD + return inputJsonRecords.map(x -> "").filter(x -> !Objects.equals(x, "")); + } + } + }