From 77f4e73615d0949eec70187e397ef17f62e908a0 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Fri, 11 Oct 2019 23:00:55 -0700 Subject: [PATCH] [HUDI-121] Fix licensing issues found during RC voting by general incubator group --- DISCLAIMER | 6 - DISCLAIMER-WIP | 26 + LICENSE | 118 ++++ .../org/apache/hudi/cli/SparkHelpers.scala | 11 +- .../hudi/common/TestRawTripPayload.java | 4 +- .../hudi/avro/HoodieAvroWriteSupport.java | 2 +- .../hudi/avro/MercifulJsonConverter.java | 302 +++++++-- .../apache/hudi/common/HoodieJsonPayload.java | 4 +- .../common/util/ObjectSizeCalculator.java | 19 - .../org/apache/hudi/common/util/Option.java | 361 +++------- .../apache/hudi/common/util/ParquetUtils.java | 30 +- .../hudi/common/util/SchemaTestUtil.java | 4 +- .../apache/hudi/AvroConversionHelper.scala | 302 +++++++++ .../org/apache/hudi/AvroConversionUtils.scala | 289 +-------- .../sources/helpers/AvroConvertor.java | 4 +- .../utilities/TestHoodieDeltaStreamer.java | 31 +- pom.xml | 7 +- style/eclipse-java-google-style.xml | 353 ---------- style/intellij-java-google-style.xml | 614 ------------------ 19 files changed, 830 insertions(+), 1657 deletions(-) delete mode 100644 DISCLAIMER create mode 100644 DISCLAIMER-WIP create mode 100644 hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala delete mode 100644 style/eclipse-java-google-style.xml delete mode 100644 style/intellij-java-google-style.xml diff --git a/DISCLAIMER b/DISCLAIMER deleted file mode 100644 index 1238d8ae8..000000000 --- a/DISCLAIMER +++ /dev/null @@ -1,6 +0,0 @@ -Apache Hudi (incubating) is an effort undergoing incubation at The Apache Software Foundation -(ASF), sponsored by the Apache Incubator PMC. Incubation is required of all newly accepted -projects until a further review indicates that the infrastructure, communications, and decision -making process have stabilized in a manner consistent with other successful ASF projects. While -incubation status is not necessarily a reflection of the completeness or stability of the code, -it does indicate that the project has yet to be fully endorsed by the ASF. diff --git a/DISCLAIMER-WIP b/DISCLAIMER-WIP new file mode 100644 index 000000000..62fbb5f40 --- /dev/null +++ b/DISCLAIMER-WIP @@ -0,0 +1,26 @@ +Apache Hudi (incubating) is an effort undergoing incubation +at The Apache Software Foundation (ASF), sponsored by the Apache Incubator. + +Incubation is required of all newly accepted projects until a further review +indicates that the infrastructure, communications, and decision making process +have stabilized in a manner consistent with other successful ASF projects. + +While incubation status is not necessarily a reflection of the +completeness or stability of the code, it does indicate that the +project has yet to be fully endorsed by the ASF. + +Some of the incubating project's releases may not be fully compliant +with ASF policy. For example, releases may have incomplete or +un-reviewed licensing conditions. What follows is a list of known +issues the project is currently aware of (note that this list, by +definition, is likely to be incomplete): + + * The LICENSE and NOTICE files may not be complete and will be fixed with the next release. + +If you are planning to incorporate this work into your +product or project, please be aware that you will need to conduct a +thorough licensing review to determine the overall implications of +including this work. For the current status of this project through the Apache +Incubator visit: + +http://incubator.apache.org/projects/hudi.html diff --git a/LICENSE b/LICENSE index f433b1a53..17461664d 100644 --- a/LICENSE +++ b/LICENSE @@ -175,3 +175,121 @@ of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +-------------------------------------------------------------------------------- + +This product includes code from Apache Hive. + +* org.apache.hadoop.hive.ql.io.CombineHiveInputFormat copied to org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat + +Copyright: 2011-2019 The Apache Software Foundation +Home page: http://hive.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This product includes code from Apache SystemML. + +* org.apache.hudi.func.LazyIterableIterator adapted from org/apache/sysml/runtime/instructions/spark/data/LazyIterableIterator + +Copyright: 2015-2018 The Apache Software Foundation +Home page: https://systemml.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + +This product includes code from https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/objectsize/ObjectSizeCalculator.java with the following license + +================================================================================================= + Copyright 2011 Twitter, Inc. + ------------------------------------------------------------------------------------------------- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this work except in compliance with the License. + You may obtain a copy of the License in the LICENSE file, or at: + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================================= + +This product includes code from Databricks spark-avro with the below license + +* org.apache.hudi.AvroConversionHelper copied from classes in com/databricks/spark/avro package + + Copyright 2014 Databricks + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +-------------------------------------------------------------------------------- + +This product includes code from https://github.com/big-data-europe/README + +* docker/hoodie/hadoop/base/entrypoint.sh copied from https://github.com/big-data-europe/docker-hadoop/blob/master/base/entrypoint.sh + +which is under the MIT license (https://github.com/big-data-europe/README#license) + +The MIT License (MIT) + +Copyright (c) 2015 TENFORCE BVBA, INSTITUT FUR ANGEWANDTE INFORMATIK EV, +NATIONAL CENTER FOR SCIENTIFIC RESEARCH "DEMOKRITOS", SEMANTIC WEB COMPANY +GMBH, FRAUNHOFER-GESELLSCHAFT ZUR FOERDERUNG DER ANGEWANDTEN FORSCHUNG E.V, +ETHNIKO KAI KAPODISTRIAKO PANEPISTIMIO ATHINON, GEIE ERCIM + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +------------------------------------------------------------------------------- diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index f2bf69b2a..7dc8b2b60 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -17,6 +17,9 @@ package org.apache.hudi.cli +import java.util +import java.util.Map + import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.conf.Configuration @@ -54,11 +57,6 @@ object SparkHelpers { } writer.close } - - def getBloomFilter(file: String, conf: Configuration): String = { - val footer = ParquetFileReader.readFooter(conf, new Path(file)); - return footer.getFileMetaData().getKeyValueMetaData().get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY) - } } @@ -124,8 +122,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { * @return */ def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String): Boolean = { - val bfStr = SparkHelpers.getBloomFilter(file, conf) - val bf = new BloomFilter(bfStr) + val bf = ParquetUtils.readBloomFilterFromParquetMetadata(conf, new Path(file)) val foundCount = sqlContext.parquetFile(file) .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`") .collect().count(r => !bf.mightContain(r.getString(0))) diff --git a/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java b/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java index 074aad449..130fb5454 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/TestRawTripPayload.java @@ -95,8 +95,8 @@ public class TestRawTripPayload implements HoodieRecordPayload fieldTypeProcessors = getFieldTypeProcessors(); - public MercifulJsonConverter(Schema schema) { - this.baseSchema = schema; + private final ObjectMapper mapper; + + /** + * Build type processor map for each avro type. + */ + private static Map getFieldTypeProcessors() { + Map processorMap = + new ImmutableMap.Builder().put(Type.STRING, generateStringTypeHandler()) + .put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler()) + .put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler()) + .put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler()) + .put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler()) + .put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler()) + .put(Type.FIXED, generateFixedTypeHandler()).build(); + return processorMap; } + /** + * Uses a default objectMapper to deserialize a json string + */ + public MercifulJsonConverter() { + this(new ObjectMapper()); + } - public GenericRecord convert(String json) throws IOException { + /** + * Allows a configured ObjectMapper to be passed for converting json records to avro record + */ + public MercifulJsonConverter(ObjectMapper mapper) { + this.mapper = mapper; + } + + /** + * Converts json to Avro generic record + * + * @param json Json record + * @param schema Schema + */ + public GenericRecord convert(String json, Schema schema) { try { - return convert(mapper.readValue(json, Map.class), baseSchema); + Map jsonObjectMap = mapper.readValue(json, Map.class); + return convertJsonToAvro(jsonObjectMap, schema); } catch (IOException e) { - throw new IOException("Failed to parse as Json: " + json + "\n\n" + e.getMessage()); + throw new HoodieIOException(e.getMessage(), e); } } - private GenericRecord convert(Map raw, Schema schema) throws IOException { - GenericRecord result = new GenericData.Record(schema); + private static GenericRecord convertJsonToAvro(Map inputJson, Schema schema) { + GenericRecord avroRecord = new GenericData.Record(schema); for (Schema.Field f : schema.getFields()) { - String name = f.name(); - Object rawValue = raw.get(name); - if (rawValue != null) { - result.put(f.pos(), typeConvert(rawValue, name, f.schema())); + Object val = inputJson.get(f.name()); + if (val != null) { + avroRecord.put(f.pos(), convertJsonToAvroField(val, f.name(), f.schema())); } } - - return result; + return avroRecord; } - private Object typeConvert(Object value, String name, Schema schema) throws IOException { + private static Schema getNonNull(Schema schema) { + List types = schema.getTypes(); + Schema.Type firstType = types.get(0).getType(); + return firstType.equals(Schema.Type.NULL) ? types.get(1) : types.get(0); + } + + private static boolean isOptional(Schema schema) { + return schema.getType().equals(Schema.Type.UNION) && schema.getTypes().size() == 2 + && (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) + || schema.getTypes().get(1).getType().equals(Schema.Type.NULL)); + } + + private static Object convertJsonToAvroField(Object value, String name, Schema schema) { + if (isOptional(schema)) { if (value == null) { return null; @@ -73,83 +120,204 @@ public class MercifulJsonConverter { } } else if (value == null) { // Always fail on null for non-nullable schemas - throw new JsonConversionException(null, name, schema); + throw new HoodieJsonToAvroConversionException(null, name, schema); } - switch (schema.getType()) { - case BOOLEAN: + JsonToAvroFieldProcessor processor = fieldTypeProcessors.get(schema.getType()); + if (null != processor) { + return processor.convertToAvro(value, name, schema); + } + throw new IllegalArgumentException("JsonConverter cannot handle type: " + schema.getType()); + } + + /** + * Base Class for converting json to avro fields + */ + private abstract static class JsonToAvroFieldProcessor implements Serializable { + + public Object convertToAvro(Object value, String name, Schema schema) { + Pair res = convert(value, name, schema); + if (!res.getLeft()) { + throw new HoodieJsonToAvroConversionException(value, name, schema); + } + return res.getRight(); + } + + protected abstract Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException; + } + + private static JsonToAvroFieldProcessor generateBooleanTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { if (value instanceof Boolean) { - return value; + return Pair.of(true, value); } - break; - case DOUBLE: + return Pair.of(false, null); + } + }; + } + + private static JsonToAvroFieldProcessor generateIntTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { if (value instanceof Number) { - return ((Number) value).doubleValue(); + return Pair.of(true, ((Number) value).intValue()); + } else if (value instanceof String) { + return Pair.of(true, Integer.valueOf((String) value)); } - break; - case FLOAT: + return Pair.of(false, null); + } + }; + } + + private static JsonToAvroFieldProcessor generateDoubleTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { if (value instanceof Number) { - return ((Number) value).floatValue(); + return Pair.of(true, ((Number) value).doubleValue()); + } else if (value instanceof String) { + return Pair.of(true, Double.valueOf((String) value)); } - break; - case INT: + return Pair.of(false, null); + } + }; + } + + private static JsonToAvroFieldProcessor generateFloatTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { if (value instanceof Number) { - return ((Number) value).intValue(); + return Pair.of(true, ((Number) value).floatValue()); + } else if (value instanceof String) { + return Pair.of(true, Float.valueOf((String) value)); } - break; - case LONG: + return Pair.of(false, null); + } + }; + } + + private static JsonToAvroFieldProcessor generateLongTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { if (value instanceof Number) { - return ((Number) value).longValue(); + return Pair.of(true, ((Number) value).longValue()); + } else if (value instanceof String) { + return Pair.of(true, Long.valueOf((String) value)); } - break; - case STRING: - return value.toString(); - case ENUM: + return Pair.of(false, null); + } + }; + } + + private static JsonToAvroFieldProcessor generateStringTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { + return Pair.of(true, value.toString()); + } + }; + } + + private static JsonToAvroFieldProcessor generateBytesTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { + return Pair.of(true, value.toString().getBytes()); + } + }; + } + + private static JsonToAvroFieldProcessor generateFixedTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { + byte[] src = value.toString().getBytes(); + byte[] dst = new byte[schema.getFixedSize()]; + System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), src.length)); + return Pair.of(true, dst); + } + }; + } + + private static JsonToAvroFieldProcessor generateEnumTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { if (schema.getEnumSymbols().contains(value.toString())) { - return new GenericData.EnumSymbol(schema, value.toString()); + return Pair.of(true, new GenericData.EnumSymbol(schema, value.toString())); } - throw new JsonConversionException(String.format("Symbol %s not in enum", value.toString()), + throw new HoodieJsonToAvroConversionException(String.format("Symbol %s not in enum", value.toString()), schema.getFullName(), schema); - case RECORD: - return convert((Map) value, schema); - case ARRAY: + } + }; + } + + private static JsonToAvroFieldProcessor generateRecordTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { + GenericRecord result = new GenericData.Record(schema); + return Pair.of(true, convertJsonToAvro((Map) value, schema)); + } + }; + } + + private static JsonToAvroFieldProcessor generateArrayTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { Schema elementSchema = schema.getElementType(); List listRes = new ArrayList(); for (Object v : (List) value) { - listRes.add(typeConvert(v, name, elementSchema)); + listRes.add(convertJsonToAvroField(v, name, elementSchema)); } - return listRes; - case MAP: + return Pair.of(true, listRes); + } + }; + } + + private static JsonToAvroFieldProcessor generateMapTypeHandler() { + return new JsonToAvroFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) + throws HoodieJsonToAvroConversionException { Schema valueSchema = schema.getValueType(); Map mapRes = new HashMap(); for (Map.Entry v : ((Map) value).entrySet()) { - mapRes.put(v.getKey(), typeConvert(v.getValue(), name, valueSchema)); + mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, valueSchema)); } - return mapRes; - default: - throw new IllegalArgumentException("JsonConverter cannot handle type: " + schema.getType()); - } - throw new JsonConversionException(value, name, schema); + return Pair.of(true, mapRes); + } + }; } - private boolean isOptional(Schema schema) { - return schema.getType().equals(Schema.Type.UNION) && schema.getTypes().size() == 2 - && (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) - || schema.getTypes().get(1).getType().equals(Schema.Type.NULL)); - } - - private Schema getNonNull(Schema schema) { - List types = schema.getTypes(); - return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0); - } - - public static class JsonConversionException extends RuntimeException { + /** + * Exception Class for any schema conversion issue + */ + public static class HoodieJsonToAvroConversionException extends HoodieException { private Object value; private String fieldName; private Schema schema; - public JsonConversionException(Object value, String fieldName, Schema schema) { + public HoodieJsonToAvroConversionException(Object value, String fieldName, Schema schema) { this.value = value; this.fieldName = fieldName; this.schema = schema; @@ -157,7 +325,7 @@ public class MercifulJsonConverter { @Override public String toString() { - return String.format("Type conversion error for field %s, %s for %s", fieldName, value, schema); + return String.format("Json to Avro Type conversion error for field %s, %s for %s", fieldName, value, schema); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java index 238dc4ef1..54bdc0af1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java @@ -56,8 +56,8 @@ public class HoodieJsonPayload implements HoodieRecordPayload @Override public Option getInsertValue(Schema schema) throws IOException { - MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema); - return Option.of(jsonConverter.convert(getJsonData())); + MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); + return Option.of(jsonConverter.convert(getJsonData(), schema)); } private String getJsonData() throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java index 6eb6842b7..b8b2aefe7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java @@ -1,22 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -// COPIED FROM https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/objectsize/ -// ObjectSizeCalculator.java // ================================================================================================= // Copyright 2011 Twitter, Inc. // ------------------------------------------------------------------------------------------------- diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java index ecc01fb1f..083c3b428 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java @@ -24,291 +24,122 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Predicate; import java.util.function.Supplier; /** - * Copied from java.util.Optional and made Serializable along with methods to convert to/from standard Option + * Provides functionality same as java.util.Optional but is also made Serializable. Additional APIs are provided to + * convert to/from java.util.Optional */ public final class Option implements Serializable { private static final long serialVersionUID = 0L; - /** - * Common instance for {@code empty()}. - */ - private static final Option EMPTY = new Option<>(); + private static final Option NULL_VAL = new Option<>(); - /** - * If non-null, the value; if null, indicates no value is present - */ - private final T value; - - /** - * Constructs an empty instance. - * - * @implNote Generally only one empty instance, {@link Option#EMPTY}, should exist per VM. - */ - private Option() { - this.value = null; - } - - /** - * Returns an empty {@code Option} instance. No value is present for this Option. - * - * @param Type of the non-existent value - * @return an empty {@code Option} - * @apiNote Though it may be tempting to do so, avoid testing if an object is empty by comparing with {@code ==} - * against instances returned by {@code Option.empty()}. There is no guarantee that it is a singleton. - * Instead, use {@link #isPresent()}. - */ - public static Option empty() { - @SuppressWarnings("unchecked") - Option t = (Option) EMPTY; - return t; - } - - /** - * Constructs an instance with the value present. - * - * @param value the non-null value to be present - * @throws NullPointerException if value is null - */ - private Option(T value) { - this.value = Objects.requireNonNull(value); - } - - /** - * Returns an {@code Option} with the specified present non-null value. - * - * @param the class of the value - * @param value the value to be present, which must be non-null - * @return an {@code Option} with the value present - * @throws NullPointerException if value is null - */ - public static Option of(T value) { - return new Option<>(value); - } - - /** - * Returns an {@code Option} describing the specified value, if non-null, otherwise returns an empty {@code Option}. - * - * @param the class of the value - * @param value the possibly-null value to describe - * @return an {@code Option} with a present value if the specified value is non-null, otherwise an empty {@code - * Option} - */ - public static Option ofNullable(T value) { - return value == null ? empty() : of(value); - } - - /** - * If a value is present in this {@code Option}, returns the value, otherwise throws {@code NoSuchElementException}. - * - * @return the non-null value held by this {@code Option} - * @throws NoSuchElementException if there is no value present - * @see Option#isPresent() - */ - public T get() { - if (value == null) { - throw new NoSuchElementException("No value present"); - } - return value; - } - - /** - * Return {@code true} if there is a value present, otherwise {@code false}. - * - * @return {@code true} if there is a value present, otherwise {@code false} - */ - public boolean isPresent() { - return value != null; - } - - /** - * If a value is present, invoke the specified consumer with the value, otherwise do nothing. - * - * @param consumer block to be executed if a value is present - * @throws NullPointerException if value is present and {@code consumer} is null - */ - public void ifPresent(Consumer consumer) { - if (value != null) { - consumer.accept(value); - } - } - - /** - * If a value is present, and the value matches the given predicate, return an {@code Option} describing the value, - * otherwise return an empty {@code Option}. - * - * @param predicate a predicate to apply to the value, if present - * @return an {@code Option} describing the value of this {@code Option} if a value is present and the value matches - * the given predicate, otherwise an empty {@code Option} - * @throws NullPointerException if the predicate is null - */ - public Option filter(Predicate predicate) { - Objects.requireNonNull(predicate); - if (!isPresent()) { - return this; - } else { - return predicate.test(value) ? this : empty(); - } - } - - /** - * If a value is present, apply the provided mapping function to it, and if the result is non-null, return an {@code - * Option} describing the result. Otherwise return an empty {@code Option}. - * - * @param The type of the result of the mapping function - * @param mapper a mapping function to apply to the value, if present - * @return an {@code Option} describing the result of applying a mapping function to the value of this {@code Option}, - * if a value is present, otherwise an empty {@code Option} - * @throws NullPointerException if the mapping function is null - * @apiNote This method supports post-processing on optional values, without the need to explicitly check for a return - * status. For example, the following code traverses a stream of file names, selects one that has not yet - * been processed, and then opens that file, returning an {@code Option}: - * - *
-   * {@code
-   *     Option fis =
-   *         names.stream().filter(name -> !isProcessedYet(name))
-   *                       .findFirst()
-   *                       .map(name -> new FileInputStream(name));
-   * }
-   *          
- * - * Here, {@code findFirst} returns an {@code Option}, and then {@code map} returns an {@code - * Option} for the desired file if one exists. - */ - public Option map(Function mapper) { - Objects.requireNonNull(mapper); - if (!isPresent()) { - return empty(); - } else { - return Option.ofNullable(mapper.apply(value)); - } - } - - /** - * If a value is present, apply the provided {@code Option}-bearing mapping function to it, return that result, - * otherwise return an empty {@code Option}. This method is similar to {@link #map(Function)}, but the provided mapper - * is one whose result is already an {@code Option}, and if invoked, {@code flatMap} does not wrap it with an - * additional {@code Option}. - * - * @param The type parameter to the {@code Option} returned by - * @param mapper a mapping function to apply to the value, if present the mapping function - * @return the result of applying an {@code Option}-bearing mapping function to the value of this {@code Option}, if a - * value is present, otherwise an empty {@code Option} - * @throws NullPointerException if the mapping function is null or returns a null result - */ - public Option flatMap(Function> mapper) { - Objects.requireNonNull(mapper); - if (!isPresent()) { - return empty(); - } else { - return Objects.requireNonNull(mapper.apply(value)); - } - } - - /** - * Return the value if present, otherwise return {@code other}. - * - * @param other the value to be returned if there is no value present, may be null - * @return the value, if present, otherwise {@code other} - */ - public T orElse(T other) { - return value != null ? value : other; - } - - /** - * Return the value if present, otherwise invoke {@code other} and return the result of that invocation. - * - * @param other a {@code Supplier} whose result is returned if no value is present - * @return the value if present otherwise the result of {@code other.get()} - * @throws NullPointerException if value is not present and {@code other} is null - */ - public T orElseGet(Supplier other) { - return value != null ? value : other.get(); - } - - /** - * Return the contained value, if present, otherwise throw an exception to be created by the provided supplier. - * - * @param Type of the exception to be thrown - * @param exceptionSupplier The supplier which will return the exception to be thrown - * @return the present value - * @throws X if there is no value present - * @throws NullPointerException if no value is present and {@code exceptionSupplier} is null - * @apiNote A method reference to the exception constructor with an empty argument list can be used as the supplier. - * For example, {@code IllegalStateException::new} - */ - public T orElseThrow(Supplier exceptionSupplier) throws X { - if (value != null) { - return value; - } else { - throw exceptionSupplier.get(); - } - } - - /** - * Indicates whether some other object is "equal to" this Option. The other object is considered equal if: - *
    - *
  • it is also an {@code Option} and; - *
  • both instances have no value present or; - *
  • the present values are "equal to" each other via {@code equals()}. - *
- * - * @param obj an object to be tested for equality - * @return {code true} if the other object is "equal to" this object otherwise {@code false} - */ - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (!(obj instanceof Option)) { - return false; - } - - Option other = (Option) obj; - return Objects.equals(value, other.value); - } - - /** - * Returns the hash code value of the present value, if any, or 0 (zero) if no value is present. - * - * @return hash code value of the present value or 0 if no value is present - */ - @Override - public int hashCode() { - return Objects.hashCode(value); - } - - /** - * Returns a non-empty string representation of this Option suitable for debugging. The exact presentation format is - * unspecified and may vary between implementations and versions. - * - * @return the string representation of this instance - * @implSpec If a value is present the result must include its string representation in the result. Empty and present - * Optionals must be unambiguously differentiable. - */ - @Override - public String toString() { - return value != null ? String.format("Option[%s]", value) : "Option.empty"; - } + private final T val; /** * Convert to java Optional */ public Optional toJavaOptional() { - return Optional.ofNullable(value); + return Optional.ofNullable(val); } /** * Convert from java.util.Optional + * + * @param v java.util.Optional object + * @param type of the value stored in java.util.Optional object + * @return Option */ public static Option fromJavaOptional(Optional v) { return Option.ofNullable(v.orElse(null)); } + + private Option() { + this.val = null; + } + + private Option(T val) { + if (null == val) { + throw new NullPointerException("Expected a non-null value. Got null"); + } + this.val = val; + } + + public static Option empty() { + return (Option) NULL_VAL; + } + + public static Option of(T value) { + return new Option<>(value); + } + + public static Option ofNullable(T value) { + return null == value ? empty() : of(value); + } + + public boolean isPresent() { + return null != val; + } + + public T get() { + if (null == val) { + throw new NoSuchElementException("No value present in Option"); + } + return val; + } + + public void ifPresent(Consumer consumer) { + if (val != null) { + // process the value + consumer.accept(val); + } + } + + public Option map(Function mapper) { + if (!isPresent()) { + return empty(); + } else { + return Option.ofNullable(mapper.apply(val)); + } + } + + public T orElse(T other) { + return val != null ? val : other; + } + + public T orElseGet(Supplier other) { + return val != null ? val : other.get(); + } + + public T orElseThrow(Supplier exceptionSupplier) throws X { + if (val != null) { + return val; + } else { + throw exceptionSupplier.get(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Option option = (Option) o; + return Objects.equals(val, option.val); + } + + @Override + public int hashCode() { + return Objects.hash(val); + } + + @Override + public String toString() { + return "Option{" + "val=" + val + '}'; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 40d8d9f16..788e6070d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.util; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -116,15 +117,15 @@ public class ParquetUtils { return readMetadata(configuration, parquetFilePath).getFileMetaData().getSchema(); } - private static List readParquetFooter(Configuration configuration, Path parquetFilePath, - String... footerNames) { - List footerVals = new ArrayList<>(); + private static Map readParquetFooter(Configuration configuration, boolean required, + Path parquetFilePath, String... footerNames) { + Map footerVals = new HashMap<>(); ParquetMetadata footer = readMetadata(configuration, parquetFilePath); Map metadata = footer.getFileMetaData().getKeyValueMetaData(); for (String footerName : footerNames) { if (metadata.containsKey(footerName)) { - footerVals.add(metadata.get(footerName)); - } else { + footerVals.put(footerName, metadata.get(footerName)); + } else if (required) { throw new MetadataNotFoundException( "Could not find index in Parquet footer. " + "Looked for key " + footerName + " in " + parquetFilePath); } @@ -140,21 +141,28 @@ public class ParquetUtils { * Read out the bloom filter from the parquet file meta data. */ public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) { - String footerVal = - readParquetFooter(configuration, parquetFilePath, HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY) - .get(0); - return new BloomFilter(footerVal); + Map footerVals = + readParquetFooter(configuration, false, parquetFilePath, + HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, + HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + if (null == footerVal) { + // We use old style key "com.uber.hoodie.bloomfilter" + footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + } + return footerVal != null ? new BloomFilter(footerVal) : null; } public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) { - List minMaxKeys = readParquetFooter(configuration, parquetFilePath, + Map minMaxKeys = readParquetFooter(configuration, true, parquetFilePath, HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER); if (minMaxKeys.size() != 2) { throw new HoodieException( String.format("Could not read min/max record key out of footer correctly from %s. read) : %s", parquetFilePath, minMaxKeys)); } - return new String[] {minMaxKeys.get(0), minMaxKeys.get(1)}; + return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), + minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)}; } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/util/SchemaTestUtil.java index 7fbf78244..725c091a5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/SchemaTestUtil.java @@ -167,7 +167,7 @@ public class SchemaTestUtil { public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, String commitTime, String fileId) throws IOException { TestRecord record = new TestRecord(commitTime, recordNumber, fileId); - MercifulJsonConverter converter = new MercifulJsonConverter(schema); - return converter.convert(record.toJsonString()); + MercifulJsonConverter converter = new MercifulJsonConverter(); + return converter.convert(record.toJsonString(), schema); } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala new file mode 100644 index 000000000..97f89f945 --- /dev/null +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -0,0 +1,302 @@ +/* + * This code is copied from com.databricks:spark-avro with following license + * + * Copyright 2014 Databricks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.nio.ByteBuffer +import java.sql.{Date, Timestamp} +import java.util + +import com.databricks.spark.avro.SchemaConverters +import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.GenericData.{Fixed, Record} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.types._ + +import scala.collection.JavaConverters._ + +object AvroConversionHelper { + + /** + * + * Returns a converter function to convert row in avro format to GenericRow of catalyst. + * + * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in + * by user. + * @param targetSqlType Target catalyst sql type after the conversion. + * @return returns a converter function to convert row in avro format to GenericRow of catalyst. + */ + def createConverterToRow(sourceAvroSchema: Schema, + targetSqlType: DataType): AnyRef => AnyRef = { + + def createConverter(avroSchema: Schema, sqlType: DataType, path: List[String]): AnyRef => AnyRef = { + val avroType = avroSchema.getType + (sqlType, avroType) match { + // Avro strings are in Utf8, so we have to call toString on them + case (StringType, STRING) | (StringType, ENUM) => + (item: AnyRef) => if (item == null) null else item.toString + // Byte arrays are reused by avro, so we have to make a copy of them. + case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | + (FloatType, FLOAT) | (LongType, LONG) => + identity + case (BinaryType, FIXED) => + (item: AnyRef) => + if (item == null) { + null + } else { + item.asInstanceOf[Fixed].bytes().clone() + } + case (BinaryType, BYTES) => + (item: AnyRef) => + if (item == null) { + null + } else { + val byteBuffer = item.asInstanceOf[ByteBuffer] + val bytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(bytes) + bytes + } + + case (struct: StructType, RECORD) => + val length = struct.fields.length + val converters = new Array[AnyRef => AnyRef](length) + val avroFieldIndexes = new Array[Int](length) + var i = 0 + while (i < length) { + val sqlField = struct.fields(i) + val avroField = avroSchema.getField(sqlField.name) + if (avroField != null) { + val converter = createConverter(avroField.schema(), sqlField.dataType, + path :+ sqlField.name) + converters(i) = converter + avroFieldIndexes(i) = avroField.pos() + } else if (!sqlField.nullable) { + throw new IncompatibleSchemaException( + s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " + + "in Avro schema\n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + i += 1 + } + + (item: AnyRef) => { + if (item == null) { + null + } else { + val record = item.asInstanceOf[GenericRecord] + + val result = new Array[Any](length) + var i = 0 + while (i < converters.length) { + if (converters(i) != null) { + val converter = converters(i) + result(i) = converter(record.get(avroFieldIndexes(i))) + } + i += 1 + } + new GenericRow(result) + } + } + case (arrayType: ArrayType, ARRAY) => + val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType, + path) + val allowsNull = arrayType.containsNull + (item: AnyRef) => { + if (item == null) { + null + } else { + item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element => + if (element == null && !allowsNull) { + throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + elementConverter(element) + } + } + } + } + case (mapType: MapType, MAP) if mapType.keyType == StringType => + val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path) + val allowsNull = mapType.valueContainsNull + (item: AnyRef) => { + if (item == null) { + null + } else { + item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { x => + if (x._2 == null && !allowsNull) { + throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + (x._1.toString, valueConverter(x._2)) + } + }.toMap + } + } + case (sqlType, UNION) => + if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { + val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) + if (remainingUnionTypes.size == 1) { + createConverter(remainingUnionTypes.head, sqlType, path) + } else { + createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path) + } + } else avroSchema.getTypes.asScala.map(_.getType) match { + case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path) + case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType => + (item: AnyRef) => { + item match { + case null => null + case l: java.lang.Long => l + case i: java.lang.Integer => new java.lang.Long(i.longValue()) + } + } + case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType => + (item: AnyRef) => { + item match { + case null => null + case d: java.lang.Double => d + case f: java.lang.Float => new java.lang.Double(f.doubleValue()) + } + } + case other => + sqlType match { + case t: StructType if t.fields.length == avroSchema.getTypes.size => + val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map { + case (field, schema) => + createConverter(schema, field.dataType, path :+ field.name) + } + + (item: AnyRef) => + if (item == null) { + null + } else { + val i = GenericData.get().resolveUnion(avroSchema, item) + val converted = new Array[Any](fieldConverters.length) + converted(i) = fieldConverters(i)(item) + new GenericRow(converted) + } + case _ => throw new IncompatibleSchemaException( + s"Cannot convert Avro schema to catalyst type because schema at path " + + s"${path.mkString(".")} is not compatible " + + s"(avroType = $other, sqlType = $sqlType). \n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + } + case (left, right) => + throw new IncompatibleSchemaException( + s"Cannot convert Avro schema to catalyst type because schema at path " + + s"${path.mkString(".")} is not compatible (avroType = $left, sqlType = $right). \n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + } + + createConverter(sourceAvroSchema, targetSqlType, List.empty[String]) + } + + def createConverterToAvro(dataType: DataType, + structName: String, + recordNamespace: String): Any => Any = { + dataType match { + case BinaryType => (item: Any) => + item match { + case null => null + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + } + case IntegerType | LongType | + FloatType | DoubleType | StringType | BooleanType => identity + case ByteType => (item: Any) => + if (item == null) null else item.asInstanceOf[Byte].intValue + case ShortType => (item: Any) => + if (item == null) null else item.asInstanceOf[Short].intValue + case _: DecimalType => (item: Any) => if (item == null) null else item.toString + case TimestampType => (item: Any) => + if (item == null) null else item.asInstanceOf[Timestamp].getTime + case DateType => (item: Any) => + if (item == null) null else item.asInstanceOf[Date].getTime + case ArrayType(elementType, _) => + val elementConverter = createConverterToAvro( + elementType, + structName, + getNewRecordNamespace(elementType, recordNamespace, structName)) + (item: Any) => { + if (item == null) { + null + } else { + val sourceArray = item.asInstanceOf[Seq[Any]] + val sourceArraySize = sourceArray.size + val targetList = new util.ArrayList[Any](sourceArraySize) + var idx = 0 + while (idx < sourceArraySize) { + targetList.add(elementConverter(sourceArray(idx))) + idx += 1 + } + targetList + } + } + case MapType(StringType, valueType, _) => + val valueConverter = createConverterToAvro( + valueType, + structName, + getNewRecordNamespace(valueType, recordNamespace, structName)) + (item: Any) => { + if (item == null) { + null + } else { + val javaMap = new util.HashMap[String, Any]() + item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => + javaMap.put(key, valueConverter(value)) + } + javaMap + } + } + case structType: StructType => + val builder = SchemaBuilder.record(structName).namespace(recordNamespace) + val schema: Schema = SchemaConverters.convertStructToAvro( + structType, builder, recordNamespace) + val fieldConverters = structType.fields.map(field => + createConverterToAvro( + field.dataType, + field.name, + getNewRecordNamespace(field.dataType, recordNamespace, field.name))) + (item: Any) => { + if (item == null) { + null + } else { + val record = new Record(schema) + val convertersIterator = fieldConverters.iterator + val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator + val rowIterator = item.asInstanceOf[Row].toSeq.iterator + + while (convertersIterator.hasNext) { + val converter = convertersIterator.next() + record.put(fieldNamesIterator.next(), converter(rowIterator.next())) + } + record + } + } + } + } +} diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index e119979a1..e857b2569 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -17,24 +17,14 @@ package org.apache.hudi -import java.nio.ByteBuffer -import java.sql.{Date, Timestamp} -import java.util - import com.databricks.spark.avro.SchemaConverters -import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException -import org.apache.avro.Schema.Type._ -import org.apache.avro.generic.GenericData.{Fixed, Record} -import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.GenericRecord import org.apache.avro.{Schema, SchemaBuilder} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import scala.collection.JavaConverters._ - object AvroConversionUtils { @@ -43,15 +33,15 @@ object AvroConversionUtils { val encoder = RowEncoder.apply(dataType).resolveAndBind() df.queryExecution.toRdd.map(encoder.fromRow) .mapPartitions { records => - if (records.isEmpty) Iterator.empty - else { - val convertor = createConverterToAvro(dataType, structName, recordNamespace) - records.map { x => convertor(x).asInstanceOf[GenericRecord] } + if (records.isEmpty) Iterator.empty + else { + val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace) + records.map { x => convertor(x).asInstanceOf[GenericRecord] } + } } - } } - def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss : SparkSession): Dataset[Row] = { + def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = { if (rdd.isEmpty()) { ss.emptyDataFrame } else { @@ -60,7 +50,7 @@ object AvroConversionUtils { else { val schema = Schema.parse(schemaStr) val dataType = convertAvroSchemaToStructType(schema) - val convertor = createConverterToRow(schema, dataType) + val convertor = AvroConversionHelper.createConverterToRow(schema, dataType) records.map { x => convertor(x).asInstanceOf[Row] } } }, convertAvroSchemaToStructType(Schema.parse(schemaStr))).asInstanceOf[Dataset[Row]] @@ -77,269 +67,6 @@ object AvroConversionUtils { } } - /** - * NOTE : This part of code is copied from com.databricks.spark.avro.SchemaConverters.scala (133:310) (spark-avro) - * - * Returns a converter function to convert row in avro format to GenericRow of catalyst. - * - * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in - * by user. - * @param targetSqlType Target catalyst sql type after the conversion. - * @return returns a converter function to convert row in avro format to GenericRow of catalyst. - */ - def createConverterToRow(sourceAvroSchema: Schema, - targetSqlType: DataType): AnyRef => AnyRef = { - - def createConverter(avroSchema: Schema, - sqlType: DataType, path: List[String]): AnyRef => AnyRef = { - val avroType = avroSchema.getType - (sqlType, avroType) match { - // Avro strings are in Utf8, so we have to call toString on them - case (StringType, STRING) | (StringType, ENUM) => - (item: AnyRef) => if (item == null) null else item.toString - // Byte arrays are reused by avro, so we have to make a copy of them. - case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | - (FloatType, FLOAT) | (LongType, LONG) => - identity - case (BinaryType, FIXED) => - (item: AnyRef) => - if (item == null) { - null - } else { - item.asInstanceOf[Fixed].bytes().clone() - } - case (BinaryType, BYTES) => - (item: AnyRef) => - if (item == null) { - null - } else { - val byteBuffer = item.asInstanceOf[ByteBuffer] - val bytes = new Array[Byte](byteBuffer.remaining) - byteBuffer.get(bytes) - bytes - } - - case (struct: StructType, RECORD) => - val length = struct.fields.length - val converters = new Array[AnyRef => AnyRef](length) - val avroFieldIndexes = new Array[Int](length) - var i = 0 - while (i < length) { - val sqlField = struct.fields(i) - val avroField = avroSchema.getField(sqlField.name) - if (avroField != null) { - val converter = createConverter(avroField.schema(), sqlField.dataType, - path :+ sqlField.name) - converters(i) = converter - avroFieldIndexes(i) = avroField.pos() - } else if (!sqlField.nullable) { - throw new IncompatibleSchemaException( - s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " + - "in Avro schema\n" + - s"Source Avro schema: $sourceAvroSchema.\n" + - s"Target Catalyst type: $targetSqlType") - } - i += 1 - } - - (item: AnyRef) => { - if (item == null) { - null - } else { - val record = item.asInstanceOf[GenericRecord] - - val result = new Array[Any](length) - var i = 0 - while (i < converters.length) { - if (converters(i) != null) { - val converter = converters(i) - result(i) = converter(record.get(avroFieldIndexes(i))) - } - i += 1 - } - new GenericRow(result) - } - } - case (arrayType: ArrayType, ARRAY) => - val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType, - path) - val allowsNull = arrayType.containsNull - (item: AnyRef) => { - if (item == null) { - null - } else { - item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element => - if (element == null && !allowsNull) { - throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " + - "allowed to be null") - } else { - elementConverter(element) - } - } - } - } - case (mapType: MapType, MAP) if mapType.keyType == StringType => - val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path) - val allowsNull = mapType.valueContainsNull - (item: AnyRef) => { - if (item == null) { - null - } else { - item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { x => - if (x._2 == null && !allowsNull) { - throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " + - "allowed to be null") - } else { - (x._1.toString, valueConverter(x._2)) - } - }.toMap - } - } - case (sqlType, UNION) => - if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { - val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) - if (remainingUnionTypes.size == 1) { - createConverter(remainingUnionTypes.head, sqlType, path) - } else { - createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path) - } - } else avroSchema.getTypes.asScala.map(_.getType) match { - case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path) - case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType => - (item: AnyRef) => { - item match { - case null => null - case l: java.lang.Long => l - case i: java.lang.Integer => new java.lang.Long(i.longValue()) - } - } - case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType => - (item: AnyRef) => { - item match { - case null => null - case d: java.lang.Double => d - case f: java.lang.Float => new java.lang.Double(f.doubleValue()) - } - } - case other => - sqlType match { - case t: StructType if t.fields.length == avroSchema.getTypes.size => - val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map { - case (field, schema) => - createConverter(schema, field.dataType, path :+ field.name) - } - - (item: AnyRef) => if (item == null) { - null - } else { - val i = GenericData.get().resolveUnion(avroSchema, item) - val converted = new Array[Any](fieldConverters.length) - converted(i) = fieldConverters(i)(item) - new GenericRow(converted) - } - case _ => throw new IncompatibleSchemaException( - s"Cannot convert Avro schema to catalyst type because schema at path " + - s"${path.mkString(".")} is not compatible " + - s"(avroType = $other, sqlType = $sqlType). \n" + - s"Source Avro schema: $sourceAvroSchema.\n" + - s"Target Catalyst type: $targetSqlType") - } - } - case (left, right) => - throw new IncompatibleSchemaException( - s"Cannot convert Avro schema to catalyst type because schema at path " + - s"${path.mkString(".")} is not compatible (avroType = $left, sqlType = $right). \n" + - s"Source Avro schema: $sourceAvroSchema.\n" + - s"Target Catalyst type: $targetSqlType") - } - } - createConverter(sourceAvroSchema, targetSqlType, List.empty[String]) - } - - def createConverterToAvro(dataType: DataType, - structName: String, - recordNamespace: String): Any => Any = { - dataType match { - case BinaryType => (item: Any) => - item match { - case null => null - case bytes: Array[Byte] => ByteBuffer.wrap(bytes) - } - case IntegerType | LongType | - FloatType | DoubleType | StringType | BooleanType => identity - case ByteType => (item: Any) => - if (item == null) null else item.asInstanceOf[Byte].intValue - case ShortType => (item: Any) => - if (item == null) null else item.asInstanceOf[Short].intValue - case _: DecimalType => (item: Any) => if (item == null) null else item.toString - case TimestampType => (item: Any) => - if (item == null) null else item.asInstanceOf[Timestamp].getTime - case DateType => (item: Any) => - if (item == null) null else item.asInstanceOf[Date].getTime - case ArrayType(elementType, _) => - val elementConverter = createConverterToAvro( - elementType, - structName, - getNewRecordNamespace(elementType, recordNamespace, structName)) - (item: Any) => { - if (item == null) { - null - } else { - val sourceArray = item.asInstanceOf[Seq[Any]] - val sourceArraySize = sourceArray.size - val targetList = new util.ArrayList[Any](sourceArraySize) - var idx = 0 - while (idx < sourceArraySize) { - targetList.add(elementConverter(sourceArray(idx))) - idx += 1 - } - targetList - } - } - case MapType(StringType, valueType, _) => - val valueConverter = createConverterToAvro( - valueType, - structName, - getNewRecordNamespace(valueType, recordNamespace, structName)) - (item: Any) => { - if (item == null) { - null - } else { - val javaMap = new util.HashMap[String, Any]() - item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => - javaMap.put(key, valueConverter(value)) - } - javaMap - } - } - case structType: StructType => - val builder = SchemaBuilder.record(structName).namespace(recordNamespace) - val schema: Schema = SchemaConverters.convertStructToAvro( - structType, builder, recordNamespace) - val fieldConverters = structType.fields.map(field => - createConverterToAvro( - field.dataType, - field.name, - getNewRecordNamespace(field.dataType, recordNamespace, field.name))) - (item: Any) => { - if (item == null) { - null - } else { - val record = new Record(schema) - val convertersIterator = fieldConverters.iterator - val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator - val rowIterator = item.asInstanceOf[Row].toSeq.iterator - - while (convertersIterator.hasNext) { - val converter = convertersIterator.next() - record.put(fieldNamesIterator.next(), converter(rowIterator.next())) - } - record - } - } - } - } - def convertStructTypeToAvroSchema(structType: StructType, structName: String, recordNamespace: String): Schema = { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java index 4049cb655..6416cf9ee 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java @@ -75,7 +75,7 @@ public class AvroConvertor implements Serializable { private void initJsonConvertor() { if (jsonConverter == null) { - jsonConverter = new MercifulJsonConverter(schema); + jsonConverter = new MercifulJsonConverter(); } } @@ -83,7 +83,7 @@ public class AvroConvertor implements Serializable { public GenericRecord fromJson(String json) throws IOException { initSchema(); initJsonConvertor(); - return jsonConverter.convert(json); + return jsonConverter.convert(json, schema); } public Schema getSchema() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index e0fedd6e4..0092de0ea 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -529,31 +530,17 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public static class DistanceUDF implements UDF4 { /** - * Taken from https://stackoverflow.com/questions/3694380/calculating-distance-between-two-points-using-latitude- - * longitude-what-am-i-doi Calculate distance between two points in latitude and longitude taking into account - * height difference. If you are not interested in height difference pass 0.0. Uses Haversine method as its base. - * - * lat1, lon1 Start point lat2, lon2 End point el1 Start altitude in meters el2 End altitude in meters - * - * @returns Distance in Meters + * Returns some random number as distance between the points + * + * @param lat1 Latitiude of source + * @param lat2 Latitude of destination + * @param lon1 Longitude of source + * @param lon2 Longitude of destination + * @return */ @Override public Double call(Double lat1, Double lat2, Double lon1, Double lon2) { - - final int R = 6371; // Radius of the earth - - double latDistance = Math.toRadians(lat2 - lat1); - double lonDistance = Math.toRadians(lon2 - lon1); - double a = Math.sin(latDistance / 2) * Math.sin(latDistance / 2) + Math.cos(Math.toRadians(lat1)) - * Math.cos(Math.toRadians(lat2)) * Math.sin(lonDistance / 2) * Math.sin(lonDistance / 2); - double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); - double distance = R * c * 1000; // convert to meters - - double height = 0; - - distance = Math.pow(distance, 2) + Math.pow(height, 2); - - return Math.sqrt(distance); + return new Random().nextDouble(); } } diff --git a/pom.xml b/pom.xml index 7773039fa..5a968106b 100644 --- a/pom.xml +++ b/pom.xml @@ -170,6 +170,8 @@ + - spotless-check compile @@ -197,6 +197,7 @@ +--> org.apache.maven.plugins maven-compiler-plugin @@ -345,7 +346,7 @@ **/dependency-reduced-pom.xml **/test/resources/*.data **/target/** - **/generated-sources/** + **/generated-sources/** diff --git a/style/eclipse-java-google-style.xml b/style/eclipse-java-google-style.xml deleted file mode 100644 index f99bb9f3d..000000000 --- a/style/eclipse-java-google-style.xml +++ /dev/null @@ -1,353 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/style/intellij-java-google-style.xml b/style/intellij-java-google-style.xml deleted file mode 100644 index f8b0f5e88..000000000 --- a/style/intellij-java-google-style.xml +++ /dev/null @@ -1,614 +0,0 @@ - - - - - - -