[HUDI-121] Fix licensing issues found during RC voting by general incubator group
This commit is contained in:
committed by
Balaji Varadarajan
parent
8c13340062
commit
77f4e73615
@@ -34,7 +34,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
|
||||
private String minRecordKey;
|
||||
private String maxRecordKey;
|
||||
|
||||
|
||||
public static final String OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter";
|
||||
public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter";
|
||||
public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
|
||||
public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
|
||||
|
||||
@@ -19,52 +19,99 @@
|
||||
package org.apache.hudi.avro;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Type;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
/**
|
||||
* Marjority of this is copied from
|
||||
* https://github.com/jwills/avro-json/blob/master/src/main/java/com/cloudera/science/avro/ common/JsonConverter.java
|
||||
* Adjusted for expected behavior of our use cases
|
||||
* Converts Json record to Avro Generic Record
|
||||
*/
|
||||
public class MercifulJsonConverter {
|
||||
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
private final Schema baseSchema;
|
||||
private static final Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = getFieldTypeProcessors();
|
||||
|
||||
public MercifulJsonConverter(Schema schema) {
|
||||
this.baseSchema = schema;
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
/**
|
||||
* Build type processor map for each avro type.
|
||||
*/
|
||||
private static Map<Schema.Type, JsonToAvroFieldProcessor> getFieldTypeProcessors() {
|
||||
Map<Schema.Type, JsonToAvroFieldProcessor> processorMap =
|
||||
new ImmutableMap.Builder<Schema.Type, JsonToAvroFieldProcessor>().put(Type.STRING, generateStringTypeHandler())
|
||||
.put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler())
|
||||
.put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler())
|
||||
.put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler())
|
||||
.put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler())
|
||||
.put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler())
|
||||
.put(Type.FIXED, generateFixedTypeHandler()).build();
|
||||
return processorMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses a default objectMapper to deserialize a json string
|
||||
*/
|
||||
public MercifulJsonConverter() {
|
||||
this(new ObjectMapper());
|
||||
}
|
||||
|
||||
public GenericRecord convert(String json) throws IOException {
|
||||
/**
|
||||
* Allows a configured ObjectMapper to be passed for converting json records to avro record
|
||||
*/
|
||||
public MercifulJsonConverter(ObjectMapper mapper) {
|
||||
this.mapper = mapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts json to Avro generic record
|
||||
*
|
||||
* @param json Json record
|
||||
* @param schema Schema
|
||||
*/
|
||||
public GenericRecord convert(String json, Schema schema) {
|
||||
try {
|
||||
return convert(mapper.readValue(json, Map.class), baseSchema);
|
||||
Map<String, Object> jsonObjectMap = mapper.readValue(json, Map.class);
|
||||
return convertJsonToAvro(jsonObjectMap, schema);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to parse as Json: " + json + "\n\n" + e.getMessage());
|
||||
throw new HoodieIOException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private GenericRecord convert(Map<String, Object> raw, Schema schema) throws IOException {
|
||||
GenericRecord result = new GenericData.Record(schema);
|
||||
private static GenericRecord convertJsonToAvro(Map<String, Object> inputJson, Schema schema) {
|
||||
GenericRecord avroRecord = new GenericData.Record(schema);
|
||||
for (Schema.Field f : schema.getFields()) {
|
||||
String name = f.name();
|
||||
Object rawValue = raw.get(name);
|
||||
if (rawValue != null) {
|
||||
result.put(f.pos(), typeConvert(rawValue, name, f.schema()));
|
||||
Object val = inputJson.get(f.name());
|
||||
if (val != null) {
|
||||
avroRecord.put(f.pos(), convertJsonToAvroField(val, f.name(), f.schema()));
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
return avroRecord;
|
||||
}
|
||||
|
||||
private Object typeConvert(Object value, String name, Schema schema) throws IOException {
|
||||
private static Schema getNonNull(Schema schema) {
|
||||
List<Schema> types = schema.getTypes();
|
||||
Schema.Type firstType = types.get(0).getType();
|
||||
return firstType.equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
|
||||
}
|
||||
|
||||
private static boolean isOptional(Schema schema) {
|
||||
return schema.getType().equals(Schema.Type.UNION) && schema.getTypes().size() == 2
|
||||
&& (schema.getTypes().get(0).getType().equals(Schema.Type.NULL)
|
||||
|| schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
|
||||
}
|
||||
|
||||
private static Object convertJsonToAvroField(Object value, String name, Schema schema) {
|
||||
|
||||
if (isOptional(schema)) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
@@ -73,83 +120,204 @@ public class MercifulJsonConverter {
|
||||
}
|
||||
} else if (value == null) {
|
||||
// Always fail on null for non-nullable schemas
|
||||
throw new JsonConversionException(null, name, schema);
|
||||
throw new HoodieJsonToAvroConversionException(null, name, schema);
|
||||
}
|
||||
|
||||
switch (schema.getType()) {
|
||||
case BOOLEAN:
|
||||
JsonToAvroFieldProcessor processor = fieldTypeProcessors.get(schema.getType());
|
||||
if (null != processor) {
|
||||
return processor.convertToAvro(value, name, schema);
|
||||
}
|
||||
throw new IllegalArgumentException("JsonConverter cannot handle type: " + schema.getType());
|
||||
}
|
||||
|
||||
/**
|
||||
* Base Class for converting json to avro fields
|
||||
*/
|
||||
private abstract static class JsonToAvroFieldProcessor implements Serializable {
|
||||
|
||||
public Object convertToAvro(Object value, String name, Schema schema) {
|
||||
Pair<Boolean, Object> res = convert(value, name, schema);
|
||||
if (!res.getLeft()) {
|
||||
throw new HoodieJsonToAvroConversionException(value, name, schema);
|
||||
}
|
||||
return res.getRight();
|
||||
}
|
||||
|
||||
protected abstract Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException;
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
if (value instanceof Boolean) {
|
||||
return value;
|
||||
return Pair.of(true, value);
|
||||
}
|
||||
break;
|
||||
case DOUBLE:
|
||||
return Pair.of(false, null);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateIntTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).doubleValue();
|
||||
return Pair.of(true, ((Number) value).intValue());
|
||||
} else if (value instanceof String) {
|
||||
return Pair.of(true, Integer.valueOf((String) value));
|
||||
}
|
||||
break;
|
||||
case FLOAT:
|
||||
return Pair.of(false, null);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).floatValue();
|
||||
return Pair.of(true, ((Number) value).doubleValue());
|
||||
} else if (value instanceof String) {
|
||||
return Pair.of(true, Double.valueOf((String) value));
|
||||
}
|
||||
break;
|
||||
case INT:
|
||||
return Pair.of(false, null);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).intValue();
|
||||
return Pair.of(true, ((Number) value).floatValue());
|
||||
} else if (value instanceof String) {
|
||||
return Pair.of(true, Float.valueOf((String) value));
|
||||
}
|
||||
break;
|
||||
case LONG:
|
||||
return Pair.of(false, null);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateLongTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).longValue();
|
||||
return Pair.of(true, ((Number) value).longValue());
|
||||
} else if (value instanceof String) {
|
||||
return Pair.of(true, Long.valueOf((String) value));
|
||||
}
|
||||
break;
|
||||
case STRING:
|
||||
return value.toString();
|
||||
case ENUM:
|
||||
return Pair.of(false, null);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateStringTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
return Pair.of(true, value.toString());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateBytesTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
return Pair.of(true, value.toString().getBytes());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
byte[] src = value.toString().getBytes();
|
||||
byte[] dst = new byte[schema.getFixedSize()];
|
||||
System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), src.length));
|
||||
return Pair.of(true, dst);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateEnumTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
if (schema.getEnumSymbols().contains(value.toString())) {
|
||||
return new GenericData.EnumSymbol(schema, value.toString());
|
||||
return Pair.of(true, new GenericData.EnumSymbol(schema, value.toString()));
|
||||
}
|
||||
throw new JsonConversionException(String.format("Symbol %s not in enum", value.toString()),
|
||||
throw new HoodieJsonToAvroConversionException(String.format("Symbol %s not in enum", value.toString()),
|
||||
schema.getFullName(), schema);
|
||||
case RECORD:
|
||||
return convert((Map<String, Object>) value, schema);
|
||||
case ARRAY:
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateRecordTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
GenericRecord result = new GenericData.Record(schema);
|
||||
return Pair.of(true, convertJsonToAvro((Map<String, Object>) value, schema));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateArrayTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
Schema elementSchema = schema.getElementType();
|
||||
List listRes = new ArrayList();
|
||||
for (Object v : (List) value) {
|
||||
listRes.add(typeConvert(v, name, elementSchema));
|
||||
listRes.add(convertJsonToAvroField(v, name, elementSchema));
|
||||
}
|
||||
return listRes;
|
||||
case MAP:
|
||||
return Pair.of(true, listRes);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static JsonToAvroFieldProcessor generateMapTypeHandler() {
|
||||
return new JsonToAvroFieldProcessor() {
|
||||
@Override
|
||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
|
||||
throws HoodieJsonToAvroConversionException {
|
||||
Schema valueSchema = schema.getValueType();
|
||||
Map<String, Object> mapRes = new HashMap<String, Object>();
|
||||
for (Map.Entry<String, Object> v : ((Map<String, Object>) value).entrySet()) {
|
||||
mapRes.put(v.getKey(), typeConvert(v.getValue(), name, valueSchema));
|
||||
mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, valueSchema));
|
||||
}
|
||||
return mapRes;
|
||||
default:
|
||||
throw new IllegalArgumentException("JsonConverter cannot handle type: " + schema.getType());
|
||||
}
|
||||
throw new JsonConversionException(value, name, schema);
|
||||
return Pair.of(true, mapRes);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private boolean isOptional(Schema schema) {
|
||||
return schema.getType().equals(Schema.Type.UNION) && schema.getTypes().size() == 2
|
||||
&& (schema.getTypes().get(0).getType().equals(Schema.Type.NULL)
|
||||
|| schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
|
||||
}
|
||||
|
||||
private Schema getNonNull(Schema schema) {
|
||||
List<Schema> types = schema.getTypes();
|
||||
return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
|
||||
}
|
||||
|
||||
public static class JsonConversionException extends RuntimeException {
|
||||
/**
|
||||
* Exception Class for any schema conversion issue
|
||||
*/
|
||||
public static class HoodieJsonToAvroConversionException extends HoodieException {
|
||||
|
||||
private Object value;
|
||||
private String fieldName;
|
||||
private Schema schema;
|
||||
|
||||
public JsonConversionException(Object value, String fieldName, Schema schema) {
|
||||
public HoodieJsonToAvroConversionException(Object value, String fieldName, Schema schema) {
|
||||
this.value = value;
|
||||
this.fieldName = fieldName;
|
||||
this.schema = schema;
|
||||
@@ -157,7 +325,7 @@ public class MercifulJsonConverter {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Type conversion error for field %s, %s for %s", fieldName, value, schema);
|
||||
return String.format("Json to Avro Type conversion error for field %s, %s for %s", fieldName, value, schema);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,8 +56,8 @@ public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload>
|
||||
|
||||
@Override
|
||||
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
|
||||
MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema);
|
||||
return Option.of(jsonConverter.convert(getJsonData()));
|
||||
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
|
||||
return Option.of(jsonConverter.convert(getJsonData(), schema));
|
||||
}
|
||||
|
||||
private String getJsonData() throws IOException {
|
||||
|
||||
@@ -1,22 +1,3 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
// COPIED FROM https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/objectsize/
|
||||
// ObjectSizeCalculator.java
|
||||
// =================================================================================================
|
||||
// Copyright 2011 Twitter, Inc.
|
||||
// -------------------------------------------------------------------------------------------------
|
||||
|
||||
@@ -24,291 +24,122 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Copied from java.util.Optional and made Serializable along with methods to convert to/from standard Option
|
||||
* Provides functionality same as java.util.Optional but is also made Serializable. Additional APIs are provided to
|
||||
* convert to/from java.util.Optional
|
||||
*/
|
||||
public final class Option<T> implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 0L;
|
||||
|
||||
/**
|
||||
* Common instance for {@code empty()}.
|
||||
*/
|
||||
private static final Option<?> EMPTY = new Option<>();
|
||||
private static final Option<?> NULL_VAL = new Option<>();
|
||||
|
||||
/**
|
||||
* If non-null, the value; if null, indicates no value is present
|
||||
*/
|
||||
private final T value;
|
||||
|
||||
/**
|
||||
* Constructs an empty instance.
|
||||
*
|
||||
* @implNote Generally only one empty instance, {@link Option#EMPTY}, should exist per VM.
|
||||
*/
|
||||
private Option() {
|
||||
this.value = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an empty {@code Option} instance. No value is present for this Option.
|
||||
*
|
||||
* @param <T> Type of the non-existent value
|
||||
* @return an empty {@code Option}
|
||||
* @apiNote Though it may be tempting to do so, avoid testing if an object is empty by comparing with {@code ==}
|
||||
* against instances returned by {@code Option.empty()}. There is no guarantee that it is a singleton.
|
||||
* Instead, use {@link #isPresent()}.
|
||||
*/
|
||||
public static <T> Option<T> empty() {
|
||||
@SuppressWarnings("unchecked")
|
||||
Option<T> t = (Option<T>) EMPTY;
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an instance with the value present.
|
||||
*
|
||||
* @param value the non-null value to be present
|
||||
* @throws NullPointerException if value is null
|
||||
*/
|
||||
private Option(T value) {
|
||||
this.value = Objects.requireNonNull(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code Option} with the specified present non-null value.
|
||||
*
|
||||
* @param <T> the class of the value
|
||||
* @param value the value to be present, which must be non-null
|
||||
* @return an {@code Option} with the value present
|
||||
* @throws NullPointerException if value is null
|
||||
*/
|
||||
public static <T> Option<T> of(T value) {
|
||||
return new Option<>(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code Option} describing the specified value, if non-null, otherwise returns an empty {@code Option}.
|
||||
*
|
||||
* @param <T> the class of the value
|
||||
* @param value the possibly-null value to describe
|
||||
* @return an {@code Option} with a present value if the specified value is non-null, otherwise an empty {@code
|
||||
* Option}
|
||||
*/
|
||||
public static <T> Option<T> ofNullable(T value) {
|
||||
return value == null ? empty() : of(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* If a value is present in this {@code Option}, returns the value, otherwise throws {@code NoSuchElementException}.
|
||||
*
|
||||
* @return the non-null value held by this {@code Option}
|
||||
* @throws NoSuchElementException if there is no value present
|
||||
* @see Option#isPresent()
|
||||
*/
|
||||
public T get() {
|
||||
if (value == null) {
|
||||
throw new NoSuchElementException("No value present");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return {@code true} if there is a value present, otherwise {@code false}.
|
||||
*
|
||||
* @return {@code true} if there is a value present, otherwise {@code false}
|
||||
*/
|
||||
public boolean isPresent() {
|
||||
return value != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* If a value is present, invoke the specified consumer with the value, otherwise do nothing.
|
||||
*
|
||||
* @param consumer block to be executed if a value is present
|
||||
* @throws NullPointerException if value is present and {@code consumer} is null
|
||||
*/
|
||||
public void ifPresent(Consumer<? super T> consumer) {
|
||||
if (value != null) {
|
||||
consumer.accept(value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If a value is present, and the value matches the given predicate, return an {@code Option} describing the value,
|
||||
* otherwise return an empty {@code Option}.
|
||||
*
|
||||
* @param predicate a predicate to apply to the value, if present
|
||||
* @return an {@code Option} describing the value of this {@code Option} if a value is present and the value matches
|
||||
* the given predicate, otherwise an empty {@code Option}
|
||||
* @throws NullPointerException if the predicate is null
|
||||
*/
|
||||
public Option<T> filter(Predicate<? super T> predicate) {
|
||||
Objects.requireNonNull(predicate);
|
||||
if (!isPresent()) {
|
||||
return this;
|
||||
} else {
|
||||
return predicate.test(value) ? this : empty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If a value is present, apply the provided mapping function to it, and if the result is non-null, return an {@code
|
||||
* Option} describing the result. Otherwise return an empty {@code Option}.
|
||||
*
|
||||
* @param <U> The type of the result of the mapping function
|
||||
* @param mapper a mapping function to apply to the value, if present
|
||||
* @return an {@code Option} describing the result of applying a mapping function to the value of this {@code Option},
|
||||
* if a value is present, otherwise an empty {@code Option}
|
||||
* @throws NullPointerException if the mapping function is null
|
||||
* @apiNote This method supports post-processing on optional values, without the need to explicitly check for a return
|
||||
* status. For example, the following code traverses a stream of file names, selects one that has not yet
|
||||
* been processed, and then opens that file, returning an {@code Option<FileInputStream>}:
|
||||
*
|
||||
* <pre>
|
||||
* {@code
|
||||
* Option<FileInputStream> fis =
|
||||
* names.stream().filter(name -> !isProcessedYet(name))
|
||||
* .findFirst()
|
||||
* .map(name -> new FileInputStream(name));
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* Here, {@code findFirst} returns an {@code Option<String>}, and then {@code map} returns an {@code
|
||||
* Option<FileInputStream>} for the desired file if one exists.
|
||||
*/
|
||||
public <U> Option<U> map(Function<? super T, ? extends U> mapper) {
|
||||
Objects.requireNonNull(mapper);
|
||||
if (!isPresent()) {
|
||||
return empty();
|
||||
} else {
|
||||
return Option.ofNullable(mapper.apply(value));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If a value is present, apply the provided {@code Option}-bearing mapping function to it, return that result,
|
||||
* otherwise return an empty {@code Option}. This method is similar to {@link #map(Function)}, but the provided mapper
|
||||
* is one whose result is already an {@code Option}, and if invoked, {@code flatMap} does not wrap it with an
|
||||
* additional {@code Option}.
|
||||
*
|
||||
* @param <U> The type parameter to the {@code Option} returned by
|
||||
* @param mapper a mapping function to apply to the value, if present the mapping function
|
||||
* @return the result of applying an {@code Option}-bearing mapping function to the value of this {@code Option}, if a
|
||||
* value is present, otherwise an empty {@code Option}
|
||||
* @throws NullPointerException if the mapping function is null or returns a null result
|
||||
*/
|
||||
public <U> Option<U> flatMap(Function<? super T, Option<U>> mapper) {
|
||||
Objects.requireNonNull(mapper);
|
||||
if (!isPresent()) {
|
||||
return empty();
|
||||
} else {
|
||||
return Objects.requireNonNull(mapper.apply(value));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the value if present, otherwise return {@code other}.
|
||||
*
|
||||
* @param other the value to be returned if there is no value present, may be null
|
||||
* @return the value, if present, otherwise {@code other}
|
||||
*/
|
||||
public T orElse(T other) {
|
||||
return value != null ? value : other;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the value if present, otherwise invoke {@code other} and return the result of that invocation.
|
||||
*
|
||||
* @param other a {@code Supplier} whose result is returned if no value is present
|
||||
* @return the value if present otherwise the result of {@code other.get()}
|
||||
* @throws NullPointerException if value is not present and {@code other} is null
|
||||
*/
|
||||
public T orElseGet(Supplier<? extends T> other) {
|
||||
return value != null ? value : other.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the contained value, if present, otherwise throw an exception to be created by the provided supplier.
|
||||
*
|
||||
* @param <X> Type of the exception to be thrown
|
||||
* @param exceptionSupplier The supplier which will return the exception to be thrown
|
||||
* @return the present value
|
||||
* @throws X if there is no value present
|
||||
* @throws NullPointerException if no value is present and {@code exceptionSupplier} is null
|
||||
* @apiNote A method reference to the exception constructor with an empty argument list can be used as the supplier.
|
||||
* For example, {@code IllegalStateException::new}
|
||||
*/
|
||||
public <X extends Throwable> T orElseThrow(Supplier<? extends X> exceptionSupplier) throws X {
|
||||
if (value != null) {
|
||||
return value;
|
||||
} else {
|
||||
throw exceptionSupplier.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether some other object is "equal to" this Option. The other object is considered equal if:
|
||||
* <ul>
|
||||
* <li>it is also an {@code Option} and;
|
||||
* <li>both instances have no value present or;
|
||||
* <li>the present values are "equal to" each other via {@code equals()}.
|
||||
* </ul>
|
||||
*
|
||||
* @param obj an object to be tested for equality
|
||||
* @return {code true} if the other object is "equal to" this object otherwise {@code false}
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!(obj instanceof Option)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Option<?> other = (Option<?>) obj;
|
||||
return Objects.equals(value, other.value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the hash code value of the present value, if any, or 0 (zero) if no value is present.
|
||||
*
|
||||
* @return hash code value of the present value or 0 if no value is present
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a non-empty string representation of this Option suitable for debugging. The exact presentation format is
|
||||
* unspecified and may vary between implementations and versions.
|
||||
*
|
||||
* @return the string representation of this instance
|
||||
* @implSpec If a value is present the result must include its string representation in the result. Empty and present
|
||||
* Optionals must be unambiguously differentiable.
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return value != null ? String.format("Option[%s]", value) : "Option.empty";
|
||||
}
|
||||
private final T val;
|
||||
|
||||
/**
|
||||
* Convert to java Optional
|
||||
*/
|
||||
public Optional<T> toJavaOptional() {
|
||||
return Optional.ofNullable(value);
|
||||
return Optional.ofNullable(val);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert from java.util.Optional
|
||||
*
|
||||
* @param v java.util.Optional object
|
||||
* @param <T> type of the value stored in java.util.Optional object
|
||||
* @return Option
|
||||
*/
|
||||
public static <T> Option<T> fromJavaOptional(Optional<T> v) {
|
||||
return Option.ofNullable(v.orElse(null));
|
||||
}
|
||||
|
||||
private Option() {
|
||||
this.val = null;
|
||||
}
|
||||
|
||||
private Option(T val) {
|
||||
if (null == val) {
|
||||
throw new NullPointerException("Expected a non-null value. Got null");
|
||||
}
|
||||
this.val = val;
|
||||
}
|
||||
|
||||
public static <T> Option<T> empty() {
|
||||
return (Option<T>) NULL_VAL;
|
||||
}
|
||||
|
||||
public static <T> Option<T> of(T value) {
|
||||
return new Option<>(value);
|
||||
}
|
||||
|
||||
public static <T> Option<T> ofNullable(T value) {
|
||||
return null == value ? empty() : of(value);
|
||||
}
|
||||
|
||||
public boolean isPresent() {
|
||||
return null != val;
|
||||
}
|
||||
|
||||
public T get() {
|
||||
if (null == val) {
|
||||
throw new NoSuchElementException("No value present in Option");
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
public void ifPresent(Consumer<? super T> consumer) {
|
||||
if (val != null) {
|
||||
// process the value
|
||||
consumer.accept(val);
|
||||
}
|
||||
}
|
||||
|
||||
public <U> Option<U> map(Function<? super T, ? extends U> mapper) {
|
||||
if (!isPresent()) {
|
||||
return empty();
|
||||
} else {
|
||||
return Option.ofNullable(mapper.apply(val));
|
||||
}
|
||||
}
|
||||
|
||||
public T orElse(T other) {
|
||||
return val != null ? val : other;
|
||||
}
|
||||
|
||||
public T orElseGet(Supplier<? extends T> other) {
|
||||
return val != null ? val : other.get();
|
||||
}
|
||||
|
||||
public <X extends Throwable> T orElseThrow(Supplier<? extends X> exceptionSupplier) throws X {
|
||||
if (val != null) {
|
||||
return val;
|
||||
} else {
|
||||
throw exceptionSupplier.get();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Option<?> option = (Option<?>) o;
|
||||
return Objects.equals(val, option.val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Option{" + "val=" + val + '}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -116,15 +117,15 @@ public class ParquetUtils {
|
||||
return readMetadata(configuration, parquetFilePath).getFileMetaData().getSchema();
|
||||
}
|
||||
|
||||
private static List<String> readParquetFooter(Configuration configuration, Path parquetFilePath,
|
||||
String... footerNames) {
|
||||
List<String> footerVals = new ArrayList<>();
|
||||
private static Map<String, String> readParquetFooter(Configuration configuration, boolean required,
|
||||
Path parquetFilePath, String... footerNames) {
|
||||
Map<String, String> footerVals = new HashMap<>();
|
||||
ParquetMetadata footer = readMetadata(configuration, parquetFilePath);
|
||||
Map<String, String> metadata = footer.getFileMetaData().getKeyValueMetaData();
|
||||
for (String footerName : footerNames) {
|
||||
if (metadata.containsKey(footerName)) {
|
||||
footerVals.add(metadata.get(footerName));
|
||||
} else {
|
||||
footerVals.put(footerName, metadata.get(footerName));
|
||||
} else if (required) {
|
||||
throw new MetadataNotFoundException(
|
||||
"Could not find index in Parquet footer. " + "Looked for key " + footerName + " in " + parquetFilePath);
|
||||
}
|
||||
@@ -140,21 +141,28 @@ public class ParquetUtils {
|
||||
* Read out the bloom filter from the parquet file meta data.
|
||||
*/
|
||||
public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) {
|
||||
String footerVal =
|
||||
readParquetFooter(configuration, parquetFilePath, HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)
|
||||
.get(0);
|
||||
return new BloomFilter(footerVal);
|
||||
Map<String, String> footerVals =
|
||||
readParquetFooter(configuration, false, parquetFilePath,
|
||||
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
|
||||
HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
|
||||
String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
|
||||
if (null == footerVal) {
|
||||
// We use old style key "com.uber.hoodie.bloomfilter"
|
||||
footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
|
||||
}
|
||||
return footerVal != null ? new BloomFilter(footerVal) : null;
|
||||
}
|
||||
|
||||
public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) {
|
||||
List<String> minMaxKeys = readParquetFooter(configuration, parquetFilePath,
|
||||
Map<String, String> minMaxKeys = readParquetFooter(configuration, true, parquetFilePath,
|
||||
HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
|
||||
if (minMaxKeys.size() != 2) {
|
||||
throw new HoodieException(
|
||||
String.format("Could not read min/max record key out of footer correctly from %s. read) : %s",
|
||||
parquetFilePath, minMaxKeys));
|
||||
}
|
||||
return new String[] {minMaxKeys.get(0), minMaxKeys.get(1)};
|
||||
return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
|
||||
minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -167,7 +167,7 @@ public class SchemaTestUtil {
|
||||
public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, String commitTime,
|
||||
String fileId) throws IOException {
|
||||
TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
|
||||
MercifulJsonConverter converter = new MercifulJsonConverter(schema);
|
||||
return converter.convert(record.toJsonString());
|
||||
MercifulJsonConverter converter = new MercifulJsonConverter();
|
||||
return converter.convert(record.toJsonString(), schema);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user