From e0ab89b3ac22207ff45cf3cae782d64b8be01bf1 Mon Sep 17 00:00:00 2001 From: Alexander Filipchik Date: Wed, 28 Aug 2019 04:48:38 -0700 Subject: [PATCH] [HUDI-223] Adding a way to infer target schema from the dataset after the transformation (#854) - Adding a way to decouple target and source schema providers - Adding flattening transformer --- .../utilities/deltastreamer/DeltaSync.java | 20 +++-- .../NullTargetSchemaRegistryProvider.java | 40 +++++++++ .../transform/FlatteningTransformer.java | 83 +++++++++++++++++++ .../utilities/TestFlatteningTransformer.java | 56 +++++++++++++ 4 files changed, 194 insertions(+), 5 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/NullTargetSchemaRegistryProvider.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 075e1c954..b09301075 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -24,7 +24,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC import com.codahale.metrics.Timer; import java.io.IOException; import java.io.Serializable; -import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.function.Function; @@ -282,9 +282,14 @@ public class DeltaSync implements Serializable { AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() ); // Use Transformed Row's schema if not overridden + // Use Transformed Row's schema if not overridden. If target schema is not specified + // default to RowBasedSchemaProvider schemaProvider = - this.schemaProvider == null ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())) - .orElse(dataAndCheckpoint.getSchemaProvider()) : this.schemaProvider; + this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null + ? transformed + .map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())) + .orElse(dataAndCheckpoint.getSchemaProvider()) + : this.schemaProvider; } else { // Pull the data from the source & prepare the write InputBatch> dataAndCheckpoint = @@ -472,7 +477,7 @@ public class DeltaSync implements Serializable { .forTable(cfg.targetTableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withAutoCommit(false); - if (null != schemaProvider) { + if (null != schemaProvider && null != schemaProvider.getTargetSchema()) { builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); } @@ -487,7 +492,12 @@ public class DeltaSync implements Serializable { private void registerAvroSchemas(SchemaProvider schemaProvider) { // register the schemas, so that shuffle does not serialize the full schemas if (null != schemaProvider) { - List schemas = Arrays.asList(schemaProvider.getSourceSchema(), schemaProvider.getTargetSchema()); + List schemas = new ArrayList<>(); + schemas.add(schemaProvider.getSourceSchema()); + if (schemaProvider.getTargetSchema() != null) { + schemas.add(schemaProvider.getTargetSchema()); + } + log.info("Registering Schema :" + schemas); jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/NullTargetSchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/NullTargetSchemaRegistryProvider.java new file mode 100644 index 000000000..109b499fa --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/NullTargetSchemaRegistryProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.avro.Schema; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * Schema provider that will force DeltaStreamer to infer target schema from the dataset. + * It can be used with SQL or Flattening transformers to avoid having a target schema in the schema + * registry. + */ +public class NullTargetSchemaRegistryProvider extends SchemaRegistryProvider { + + public NullTargetSchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema getTargetSchema() { + return null; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java new file mode 100644 index 000000000..d029f6c65 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import java.util.UUID; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * Transformer that can flatten nested objects. It currently doesn't unnest arrays. + */ +public class FlatteningTransformer implements Transformer { + + private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_"; + private static volatile Logger log = LogManager.getLogger(SqlQueryBasedTransformer.class); + + /** Configs supported */ + @Override + public Dataset apply( + JavaSparkContext jsc, + SparkSession sparkSession, + Dataset rowDataset, + TypedProperties properties) { + + // tmp table name doesn't like dashes + String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); + log.info("Registering tmp table : " + tmpTable); + rowDataset.registerTempTable(tmpTable); + return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + + " from " + tmpTable); + } + + public String flattenSchema(StructType schema, String prefix) { + final StringBuilder selectSQLQuery = new StringBuilder(); + + for (StructField field : schema.fields()) { + final String fieldName = field.name(); + + // it is also possible to expand arrays by using Spark "expand" function. + // As it can increase data size significantly we later pass additional property with a + // list of arrays to expand. + final String colName = prefix == null ? fieldName : (prefix + "." + fieldName); + if (field.dataType().getClass().equals(StructType.class)) { + selectSQLQuery.append(flattenSchema((StructType) field.dataType(), colName)); + } else { + selectSQLQuery.append(colName); + selectSQLQuery.append(" as "); + selectSQLQuery.append(colName.replace(".", "_")); + } + + selectSQLQuery.append(","); + } + + if (selectSQLQuery.length() > 0) { + selectSQLQuery. deleteCharAt(selectSQLQuery.length() - 1); + } + + return selectSQLQuery.toString(); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java new file mode 100644 index 000000000..c5a2ab020 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import static org.junit.Assert.assertEquals; + +import org.apache.hudi.utilities.transform.FlatteningTransformer; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.Test; + +public class TestFlatteningTransformer { + + @Test + public void testFlatten() { + FlatteningTransformer transformer = new FlatteningTransformer(); + + // Init + StructField[] nestedStructFields = new StructField[]{ + new StructField("nestedIntColumn", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("nestedStringColumn", DataTypes.StringType, true, Metadata.empty()), + }; + + StructField[] structFields = new StructField[]{ + new StructField("intColumn", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("stringColumn", DataTypes.StringType, true, Metadata.empty()), + new StructField("nestedStruct", DataTypes.createStructType(nestedStructFields), true, Metadata.empty()) + }; + + StructType schema = new StructType(structFields); + String flattenedSql = transformer.flattenSchema(schema, null); + + assertEquals("intColumn as intColumn,stringColumn as stringColumn," + + "nestedStruct.nestedIntColumn as nestedStruct_nestedIntColumn," + + "nestedStruct.nestedStringColumn as nestedStruct_nestedStringColumn", + flattenedSql); + } +}