diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 210c94849..fd051ed09 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -43,6 +43,7 @@ import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; +import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.InputBatch; @@ -97,6 +98,11 @@ public class DeltaSync implements Serializable { */ private transient SourceFormatAdapter formatAdapter; + /** + * User Provided Schema Provider. + */ + private transient SchemaProvider userProvidedSchemaProvider; + /** * Schema provider that supplies the command for reading the input and writing out the target table. */ @@ -162,20 +168,18 @@ public class DeltaSync implements Serializable { this.fs = fs; this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.props = props; - this.schemaProvider = schemaProvider; + this.userProvidedSchemaProvider = schemaProvider; refreshTimeline(); + // Register User Provided schema first + registerAvroSchemas(schemaProvider); this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); this.keyGenerator = DataSourceUtils.createKeyGenerator(props); this.formatAdapter = new SourceFormatAdapter( UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); - this.conf = conf; - - // If schemaRegistry already resolved, setup write-client - setupWriteClient(); } /** @@ -218,8 +222,7 @@ public class DeltaSync implements Serializable { if (null != srcRecordsWithCkpt) { // this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start // compactor - if (null == schemaProvider) { - // Set the schemaProvider if not user-provided + if (null == writeClient) { this.schemaProvider = srcRecordsWithCkpt.getKey(); // Setup HoodieWriteClient and compaction now that we decided on schema setupWriteClient(); @@ -280,26 +283,28 @@ public class DeltaSync implements Serializable { Option> transformed = dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); - if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) { + if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) { // If the target schema is specified through Avro schema, // pass in the schema for the Row-to-Avro conversion // to avoid nullability mismatch between Avro schema and Row schema avroRDDOptional = transformed .map(t -> AvroConversionUtils.createRdd( - t, this.schemaProvider.getTargetSchema(), + t, this.userProvidedSchemaProvider.getTargetSchema(), HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); + schemaProvider = this.userProvidedSchemaProvider; } else { + // Use Transformed Row's schema if not overridden. If target schema is not specified + // default to RowBasedSchemaProvider + schemaProvider = + transformed + .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc, + dataAndCheckpoint.getSchemaProvider(), + new RowBasedSchemaProvider(r.schema()))) + .orElse(dataAndCheckpoint.getSchemaProvider()); avroRDDOptional = transformed .map(t -> AvroConversionUtils.createRdd( t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); } - - // Use Transformed Row's schema if not overridden. If target schema is not specified - // default to RowBasedSchemaProvider - schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null - ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse( - dataAndCheckpoint.getSchemaProvider()) - : this.schemaProvider; } else { // Pull the data from the source & prepare the write InputBatch> dataAndCheckpoint = @@ -458,7 +463,7 @@ public class DeltaSync implements Serializable { * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of * this constraint. */ - public void setupWriteClient() { + private void setupWriteClient() { LOG.info("Setting up Hoodie Write Client"); if ((null != schemaProvider) && (null == writeClient)) { registerAvroSchemas(schemaProvider); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java new file mode 100644 index 000000000..43c64d0c2 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * SchemaProvider which uses separate Schema Providers for source and target. + */ +public class DelegatingSchemaProvider extends SchemaProvider { + + private final SchemaProvider sourceSchemaProvider; + private final SchemaProvider targetSchemaProvider; + + public DelegatingSchemaProvider(TypedProperties props, + JavaSparkContext jssc, + SchemaProvider sourceSchemaProvider, SchemaProvider targetSchemaProvider) { + super(props, jssc); + this.sourceSchemaProvider = sourceSchemaProvider; + this.targetSchemaProvider = targetSchemaProvider; + } + + @Override + public Schema getSourceSchema() { + return sourceSchemaProvider.getSourceSchema(); + } + + @Override + public Schema getTargetSchema() { + return targetSchemaProvider.getTargetSchema(); + } +}