1
0

[HUDI-801] Adding a way to post process schema after it is fetched (#1524)

* [HUDI-801] Adding a way to post process schema after it is fetched

Co-authored-by: Alex Filipchik <alex.filipchik@csscompany.com>
Co-authored-by: Balaji Varadarajan <balaji.varadarajan@robinhood.com>
This commit is contained in:
Alexander Filipchik
2020-09-19 11:18:36 -07:00
committed by GitHub
parent 7c45894f43
commit c8e19e2def
11 changed files with 286 additions and 32 deletions

View File

@@ -35,7 +35,12 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.hudi.utilities.transform.ChainedTransformer;
@@ -99,7 +104,7 @@ public class UtilHelpers {
}
public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg,
JavaSparkContext jssc) throws IOException {
JavaSparkContext jssc) throws IOException {
try {
return StringUtils.isNullOrEmpty(schemaProviderClass) ? null
: (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc);
@@ -108,6 +113,13 @@ public class UtilHelpers {
}
}
public static SchemaPostProcessor createSchemaPostProcessor(
String schemaPostProcessorClass, TypedProperties cfg, JavaSparkContext jssc) {
return schemaPostProcessorClass == null
? null
: (SchemaPostProcessor) ReflectionUtils.loadClass(schemaPostProcessorClass, cfg, jssc);
}
public static Option<Transformer> createTransformer(List<String> classNames) throws IOException {
try {
List<Transformer> transformers = new ArrayList<>();
@@ -368,4 +380,35 @@ public class UtilHelpers {
throw new IOException("Could not load source selector class " + sourceSelectorClass, e);
}
}
public static SchemaProvider getOriginalSchemaProvider(SchemaProvider schemaProvider) {
SchemaProvider originalProvider = schemaProvider;
if (schemaProvider instanceof SchemaProviderWithPostProcessor) {
originalProvider = ((SchemaProviderWithPostProcessor) schemaProvider).getOriginalSchemaProvider();
} else if (schemaProvider instanceof DelegatingSchemaProvider) {
originalProvider = ((DelegatingSchemaProvider) schemaProvider).getSourceSchemaProvider();
}
return originalProvider;
}
public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcessor(SchemaProvider provider,
TypedProperties cfg, JavaSparkContext jssc) {
if (provider == null) {
return null;
}
if (provider instanceof SchemaProviderWithPostProcessor) {
return (SchemaProviderWithPostProcessor)provider;
}
String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null);
return new SchemaProviderWithPostProcessor(provider,
Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc)));
}
public static SchemaProvider createRowBasedSchemaProvider(StructType structType,
TypedProperties cfg, JavaSparkContext jssc) {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc);
}
}

View File

@@ -49,7 +49,6 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.transform.Transformer;
@@ -323,7 +322,7 @@ public class DeltaSync implements Serializable {
transformed
.map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc,
dataAndCheckpoint.getSchemaProvider(),
new RowBasedSchemaProvider(r.schema())))
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc)))
.orElse(dataAndCheckpoint.getSchemaProvider());
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(

View File

@@ -547,7 +547,8 @@ public class HoodieDeltaStreamer implements Serializable {
this.props = properties;
LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc);
deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, jssc, fs, conf,
this::onInitializingWriteClient);

View File

@@ -20,7 +20,9 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
@@ -65,17 +67,16 @@ public final class SourceFormatAdapter {
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> (
(r.getSchemaProvider() instanceof FilebasedSchemaProvider)
// If the source schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
? AvroConversionUtils.createRdd(
rdd, r.getSchemaProvider().getSourceSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
: AvroConversionUtils.createRdd(
rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
))
rdd -> {
SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
return (originalProvider instanceof FilebasedSchemaProvider)
// If the source schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
? AvroConversionUtils.createRdd(rdd, r.getSchemaProvider().getSourceSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() : AvroConversionUtils.createRdd(rdd,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD();
})
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
@@ -116,4 +117,4 @@ public final class SourceFormatAdapter {
throw new IllegalArgumentException("Unknown source type (" + source.getSourceType() + ")");
}
}
}
}

View File

@@ -48,4 +48,12 @@ public class DelegatingSchemaProvider extends SchemaProvider {
public Schema getTargetSchema() {
return targetSchemaProvider.getTargetSchema();
}
public SchemaProvider getSourceSchemaProvider() {
return sourceSchemaProvider;
}
public SchemaProvider getTargetSchemaProvider() {
return targetSchemaProvider;
}
}

View File

@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
/**
* Used in {@link SchemaProvider} to modify schema before it is passed to the caller. Can be used to
* add marker fields in records with no fields, make everything optional, ...
*/
public abstract class SchemaPostProcessor implements Serializable {
/** Configs supported. */
public static class Config {
public static final String SCHEMA_POST_PROCESSOR_PROP =
"hoodie.deltastreamer.schemaprovider.schema_post_processor";
}
private static final long serialVersionUID = 1L;
protected TypedProperties config;
protected JavaSparkContext jssc;
protected SchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) {
this.config = props;
this.jssc = jssc;
}
/**
* Rewrites schema.
*
* @param schema input schema.
* @return modified schema.
*/
public abstract Schema processSchema(Schema schema);
}

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.schema;
import org.apache.avro.Schema;
import org.apache.hudi.common.util.Option;
/**
* A schema provider which applies schema post process hook on schema.
*/
public class SchemaProviderWithPostProcessor extends SchemaProvider {
private final SchemaProvider schemaProvider;
private final Option<SchemaPostProcessor> schemaPostProcessor;
public SchemaProviderWithPostProcessor(SchemaProvider schemaProvider,
Option<SchemaPostProcessor> schemaPostProcessor) {
super(null, null);
this.schemaProvider = schemaProvider;
this.schemaPostProcessor = schemaPostProcessor;
}
@Override
public Schema getSourceSchema() {
return schemaPostProcessor.map(processor -> processor.processSchema(schemaProvider.getSourceSchema()))
.orElse(schemaProvider.getSourceSchema());
}
@Override
public Schema getTargetSchema() {
return schemaPostProcessor.map(processor -> processor.processSchema(schemaProvider.getTargetSchema()))
.orElse(schemaProvider.getTargetSchema());
}
public SchemaProvider getOriginalSchemaProvider() {
return schemaProvider;
}
}

View File

@@ -21,7 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
@@ -42,7 +42,8 @@ public abstract class RowSource extends Source<Dataset<Row>> {
protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr, sourceLimit);
return res.getKey().map(dsr -> {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(dsr.schema());
SchemaProvider rowSchemaProvider =
UtilHelpers.createRowBasedSchemaProvider(dsr.schema(), props, sparkContext);
return new InputBatch<>(res.getKey(), res.getValue(), rowSchemaProvider);
}).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue()));
}

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
public class DummySchemaProvider extends SchemaProvider {
public DummySchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
@Override
public Schema getSourceSchema() {
return Schema.create(Schema.Type.NULL);
}
}

View File

@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaBuilder;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class TestSchemaPostProcessor extends UtilitiesTestBase {
private TypedProperties properties = new TypedProperties();
@Test
public void testPostProcessor() throws IOException {
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
properties, jsc);
Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
assertEquals(schema.getName(), "test");
assertNotNull(schema.getField("testString"));
}
public static class DummySchemaPostProcessor extends SchemaPostProcessor {
public DummySchemaPostProcessor(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
@Override
public Schema processSchema(Schema schema) {
return SchemaBuilder.record("test").fields().optionalString("testString").endRecord();
}
}
}

View File

@@ -39,10 +39,10 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
@@ -54,7 +54,6 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -1123,16 +1122,4 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
}
public static class DummySchemaProvider extends SchemaProvider {
public DummySchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
@Override
public Schema getSourceSchema() {
return Schema.create(Schema.Type.NULL);
}
}
}