1
0

[HUDI-723] Register avro schema if infered from SQL transformation (#1518)

* Register avro schema if infered from SQL transformation
* Make HoodieWriteClient creation done lazily always. Handle setting schema-provider and avro-schemas correctly when using SQL transformer

Co-authored-by: Alex Filipchik <alex.filipchik@csscompany.com>
Co-authored-by: Balaji Varadarajan <varadarb@uber.com>
This commit is contained in:
Alexander Filipchik
2020-05-15 12:44:03 -07:00
committed by GitHub
parent a64afdfd17
commit 25e0b75b3d
2 changed files with 73 additions and 17 deletions

View File

@@ -43,6 +43,7 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider; import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.InputBatch;
@@ -97,6 +98,11 @@ public class DeltaSync implements Serializable {
*/ */
private transient SourceFormatAdapter formatAdapter; private transient SourceFormatAdapter formatAdapter;
/**
* User Provided Schema Provider.
*/
private transient SchemaProvider userProvidedSchemaProvider;
/** /**
* Schema provider that supplies the command for reading the input and writing out the target table. * Schema provider that supplies the command for reading the input and writing out the target table.
*/ */
@@ -162,20 +168,18 @@ public class DeltaSync implements Serializable {
this.fs = fs; this.fs = fs;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props; this.props = props;
this.schemaProvider = schemaProvider; this.userProvidedSchemaProvider = schemaProvider;
refreshTimeline(); refreshTimeline();
// Register User Provided schema first
registerAvroSchemas(schemaProvider);
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props); this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
this.formatAdapter = new SourceFormatAdapter( this.formatAdapter = new SourceFormatAdapter(
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider)); UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider));
this.conf = conf; this.conf = conf;
// If schemaRegistry already resolved, setup write-client
setupWriteClient();
} }
/** /**
@@ -218,8 +222,7 @@ public class DeltaSync implements Serializable {
if (null != srcRecordsWithCkpt) { if (null != srcRecordsWithCkpt) {
// this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start // this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start
// compactor // compactor
if (null == schemaProvider) { if (null == writeClient) {
// Set the schemaProvider if not user-provided
this.schemaProvider = srcRecordsWithCkpt.getKey(); this.schemaProvider = srcRecordsWithCkpt.getKey();
// Setup HoodieWriteClient and compaction now that we decided on schema // Setup HoodieWriteClient and compaction now that we decided on schema
setupWriteClient(); setupWriteClient();
@@ -280,26 +283,28 @@ public class DeltaSync implements Serializable {
Option<Dataset<Row>> transformed = Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props)); dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) { if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
// If the target schema is specified through Avro schema, // If the target schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion // pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema // to avoid nullability mismatch between Avro schema and Row schema
avroRDDOptional = transformed avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd( .map(t -> AvroConversionUtils.createRdd(
t, this.schemaProvider.getTargetSchema(), t, this.userProvidedSchemaProvider.getTargetSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
schemaProvider = this.userProvidedSchemaProvider;
} else { } else {
// Use Transformed Row's schema if not overridden. If target schema is not specified
// default to RowBasedSchemaProvider
schemaProvider =
transformed
.map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc,
dataAndCheckpoint.getSchemaProvider(),
new RowBasedSchemaProvider(r.schema())))
.orElse(dataAndCheckpoint.getSchemaProvider());
avroRDDOptional = transformed avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd( .map(t -> AvroConversionUtils.createRdd(
t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
} }
// Use Transformed Row's schema if not overridden. If target schema is not specified
// default to RowBasedSchemaProvider
schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse(
dataAndCheckpoint.getSchemaProvider())
: this.schemaProvider;
} else { } else {
// Pull the data from the source & prepare the write // Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint = InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
@@ -458,7 +463,7 @@ public class DeltaSync implements Serializable {
* SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of
* this constraint. * this constraint.
*/ */
public void setupWriteClient() { private void setupWriteClient() {
LOG.info("Setting up Hoodie Write Client"); LOG.info("Setting up Hoodie Write Client");
if ((null != schemaProvider) && (null == writeClient)) { if ((null != schemaProvider) && (null == writeClient)) {
registerAvroSchemas(schemaProvider); registerAvroSchemas(schemaProvider);

View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
/**
* SchemaProvider which uses separate Schema Providers for source and target.
*/
public class DelegatingSchemaProvider extends SchemaProvider {
private final SchemaProvider sourceSchemaProvider;
private final SchemaProvider targetSchemaProvider;
public DelegatingSchemaProvider(TypedProperties props,
JavaSparkContext jssc,
SchemaProvider sourceSchemaProvider, SchemaProvider targetSchemaProvider) {
super(props, jssc);
this.sourceSchemaProvider = sourceSchemaProvider;
this.targetSchemaProvider = targetSchemaProvider;
}
@Override
public Schema getSourceSchema() {
return sourceSchemaProvider.getSourceSchema();
}
@Override
public Schema getTargetSchema() {
return targetSchemaProvider.getTargetSchema();
}
}