From 738635306bef3d5e7c9395d71409bc4dc8feb2cc Mon Sep 17 00:00:00 2001 From: Omkar Joshi Date: Tue, 30 Apr 2019 19:00:30 -0700 Subject: [PATCH] migrating kryo's dependency from twitter chill to plain kryo library --- hoodie-common/pom.xml | 5 + .../table/log/block/HoodieDeleteBlock.java | 2 +- .../common/util/SerializationUtils.java | 226 +++++++++--------- .../common/util/collection/DiskBasedMap.java | 4 +- .../util/collection/LazyFileIterable.java | 5 +- .../common/util/TestSerializationUtils.java | 79 ++++++ hoodie-utilities/pom.xml | 15 ++ packaging/hoodie-hadoop-mr-bundle/pom.xml | 15 ++ packaging/hoodie-presto-bundle/pom.xml | 12 + packaging/hoodie-spark-bundle/pom.xml | 12 + 10 files changed, 253 insertions(+), 122 deletions(-) create mode 100644 hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index 912f68d24..674ba504c 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -146,5 +146,10 @@ objectsize 0.0.12 + + com.esotericsoftware + kryo-shaded + 4.0.2 + diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java index 9ee4f296b..a31b51279 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java @@ -85,7 +85,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { int dataLength = dis.readInt(); byte[] data = new byte[dataLength]; dis.readFully(data); - this.keysToDelete = SerializationUtils.deserialize(data); + this.keysToDelete = SerializationUtils.deserialize(data); deflate(); } return keysToDelete; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java index c52a66674..84c05b677 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java @@ -16,145 +16,55 @@ package com.uber.hoodie.common.util; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.reflectasm.ConstructorAccess; import com.uber.hoodie.exception.HoodieSerializationException; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import org.objenesis.instantiator.ObjectInstantiator; + /** - * (NOTE: Adapted from Apache commons-lang3) - * This class defines API's to serde an object. + * {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / + * deserializing objects. */ public class SerializationUtils { + + // Caching kryo serializer to avoid creating kryo instance for every serde operation + private static final ThreadLocal serializerRef = + ThreadLocal.withInitial(() -> new KryoSerializerInstance()); + // Serialize //----------------------------------------------------------------------- /** - *

Serializes an {@code Object} to the specified stream.

- * - *

The stream will be closed once the object is written. - * This avoids the need for a finally clause, and maybe also exception - * handling, in the application code.

- * - *

The stream passed in is not buffered internally within this method. - * This is the responsibility of your application if desired.

- * - * @param obj the object to serialize to bytes, may be null - * @param outputStream the stream to write to, must not be null - * @throws IllegalArgumentException if {@code outputStream} is {@code null} - * @throws HoodieSerializationException (runtime) if the serialization fails - */ - public static void serialize(final Serializable obj, final OutputStream outputStream) { - if (outputStream == null) { - throw new IllegalArgumentException("The OutputStream must not be null"); - } - ObjectOutputStream out = null; - try { - // stream closed in the finally - out = new ObjectOutputStream(outputStream); - out.writeObject(obj); - - } catch (final IOException ex) { - throw new HoodieSerializationException("unable to serialize object", ex); - } finally { - try { - if (out != null) { - out.close(); - } - } catch (final IOException ex) { // NOPMD - // ignore close exception - } - } - } - - /** - *

Serializes an {@code Object} to a byte array for - * storage/serialization.

+ *

Serializes an {@code Object} to a byte array for storage/serialization.

* * @param obj the object to serialize to bytes * @return a byte[] with the converted Serializable - * @throws HoodieSerializationException (runtime) if the serialization fails + * @throws IOException if the serialization fails */ - public static byte[] serialize(final Serializable obj) { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - serialize(obj, baos); - return baos.toByteArray(); + public static byte[] serialize(final Object obj) throws IOException { + return serializerRef.get().serialize(obj); } // Deserialize //----------------------------------------------------------------------- /** - *

- * Deserializes an {@code Object} from the specified stream. - *

+ *

Deserializes a single {@code Object} from an array of bytes.

* - *

- * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also - * exception handling, in the application code. - *

- * - *

- * The stream passed in is not buffered internally within this method. This is the responsibility of your - * application if desired. - *

- * - *

- * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site. - * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException. - * Note that in both cases, the ClassCastException is in the call site, not in this method. - *

- * - * @param the object type to be deserialized - * @param inputStream the serialized object input stream, must not be null - * @return the deserialized object - * @throws IllegalArgumentException if {@code inputStream} is {@code null} - * @throws HoodieSerializationException (runtime) if the serialization fails - */ - public static T deserialize(final InputStream inputStream) { - if (inputStream == null) { - throw new IllegalArgumentException("The InputStream must not be null"); - } - ObjectInputStream in = null; - try { - // stream closed in the finally - in = new ObjectInputStream(inputStream); - @SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect - final T obj = (T) in.readObject(); - return obj; - - } catch (final ClassCastException ex) { - throw new HoodieSerializationException("cannot cast class", ex); - } catch (final ClassNotFoundException ex) { - throw new HoodieSerializationException("class not found", ex); - } catch (final IOException ex) { - throw new HoodieSerializationException("unable to deserialize to object", ex); - } finally { - try { - if (in != null) { - in.close(); - } - } catch (final IOException ex) { // NOPMD - // ignore close exception - } - } - } - - /** - *

- * Deserializes a single {@code Object} from an array of bytes. - *

- * - *

- * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site. - * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException. - * Note that in both cases, the ClassCastException is in the call site, not in this method. - *

+ *

If the call site incorrectly types the return value, a {@link ClassCastException} is thrown + * from the call site. Without Generics in this declaration, the call site must type cast and can + * cause the same ClassCastException. Note that in both cases, the ClassCastException is in the + * call site, not in this method.

* * @param the object type to be deserialized * @param objectData the serialized object, must not be null @@ -166,6 +76,88 @@ public class SerializationUtils { if (objectData == null) { throw new IllegalArgumentException("The byte[] must not be null"); } - return deserialize(new ByteArrayInputStream(objectData)); + return (T) serializerRef.get().deserialize(objectData); + } + + private static class KryoSerializerInstance implements Serializable { + public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576; + private final Kryo kryo; + // Caching ByteArrayOutputStream to avoid recreating it for every operation + private final ByteArrayOutputStream baos; + + KryoSerializerInstance() { + KryoInstantiator kryoInstantiator = new KryoInstantiator(); + kryo = kryoInstantiator.newKryo(); + baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE); + kryo.setRegistrationRequired(false); + } + + byte[] serialize(Object obj) throws IOException { + kryo.reset(); + baos.reset(); + Output output = new Output(baos); + this.kryo.writeClassAndObject(output, obj); + output.close(); + return baos.toByteArray(); + } + + Object deserialize(byte[] objectData) { + return this.kryo.readClassAndObject(new Input(objectData)); + } + } + + /** + * This class has a no-arg constructor, suitable for use with reflection instantiation. + * For Details checkout com.twitter.chill.KryoBase. + */ + private static class KryoInstantiator implements Serializable { + + public Kryo newKryo() { + + Kryo kryo = new KryoBase(); + // ensure that kryo doesn't fail if classes are not registered with kryo. + kryo.setRegistrationRequired(false); + // This would be used for object initialization if nothing else works out. + kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy()); + // Handle cases where we may have an odd classloader setup like with libjars + // for hadoop + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + return kryo; + } + + private static class KryoBase extends Kryo { + @Override + protected Serializer newDefaultSerializer(Class type) { + final Serializer serializer = super.newDefaultSerializer(type); + if (serializer instanceof FieldSerializer) { + final FieldSerializer fieldSerializer = (FieldSerializer) serializer; + fieldSerializer.setIgnoreSyntheticFields(true); + } + return serializer; + } + + @Override + protected ObjectInstantiator newInstantiator(Class type) { + return () -> { + // First try reflectasm - it is fastest way to instantiate an object. + try { + final ConstructorAccess access = ConstructorAccess.get(type); + return access.newInstance(); + } catch (Throwable t) { + // ignore this exception. We may want to try other way. + } + // fall back to java based instantiation. + try { + final Constructor constructor = type.getConstructor(); + constructor.setAccessible(true); + return constructor.newInstance(); + } catch (NoSuchMethodException | IllegalAccessException + | InstantiationException | InvocationTargetException e) { + // ignore this exception. we will fall back to default instantiation strategy. + } + return super.getInstantiatorStrategy().newInstantiatorOf(type).newInstance(); + }; + } + } } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java index 047b99796..9fd0091fa 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -156,8 +156,8 @@ public final class DiskBasedMap return null; } try { - return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, - entry.getOffsetOfValue(), entry.getSizeOfValue())); + return SerializationUtils.deserialize(SpillableMapUtils + .readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java index c2d74e0bd..08aa78494 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java @@ -81,8 +81,9 @@ public class LazyFileIterable implements Iterable { public R next() { Map.Entry entry = this.metadataIterator.next(); try { - return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, - entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue())); + return SerializationUtils.deserialize(SpillableMapUtils + .readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(), + entry.getValue().getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java new file mode 100644 index 000000000..f4a45b04b --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Objects; +import org.apache.avro.util.Utf8; +import org.junit.Assert; +import org.junit.Test; + +public class TestSerializationUtils { + + @Test + public void testSerDeser() throws IOException { + // It should handle null object references. + verifyObject(null); + // Object with nulls. + verifyObject(new NonSerializableClass(null)); + // Object with valid values & no default constructor. + verifyObject(new NonSerializableClass("testValue")); + // Object with multiple constructor + verifyObject(new NonSerializableClass("testValue1", "testValue2")); + // Object which is of non-serializable class. + verifyObject(new Utf8("test-key")); + // Verify serialization of list. + verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5))); + } + + private void verifyObject(T expectedValue) throws IOException { + byte[] serializedObject = SerializationUtils.serialize(expectedValue); + Assert.assertTrue(serializedObject != null && serializedObject.length > 0); + + final T deserializedValue = SerializationUtils.deserialize(serializedObject); + if (expectedValue == null) { + Assert.assertNull(deserializedValue); + } else { + Assert.assertTrue(expectedValue.equals(deserializedValue)); + } + } + + private static class NonSerializableClass { + private String id; + private String name; + + NonSerializableClass(String id) { + this(id, ""); + } + + NonSerializableClass(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof NonSerializableClass)) { + return false; + } + final NonSerializableClass other = (NonSerializableClass) obj; + return Objects.equals(this.id, other.id) && Objects.equals(this.name, other.name); + } + } +} diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index c95950a4d..00b853df4 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -81,6 +81,9 @@ org.apache.hive:hive-service org.apache.hive:hive-metastore org.apache.hive:hive-jdbc + com.esotericsoftware:kryo-shaded + org.objenesis:objenesis + com.esotericsoftware:minlog @@ -120,6 +123,18 @@ org.apache.hadoop.hive.service. com.uber.hoodie.org.apache.hadoop_hive.service. + + com.esotericsoftware.kryo. + com.uber.hoodie.com.esotericsoftware.kryo. + + + org.objenesis. + com.uber.hoodie.org.objenesis. + + + com.esotericsoftware.minlog. + com.uber.hoodie.com.esotericsoftware.minlog. + diff --git a/packaging/hoodie-hadoop-mr-bundle/pom.xml b/packaging/hoodie-hadoop-mr-bundle/pom.xml index b25cc2918..f25d0f7e6 100644 --- a/packaging/hoodie-hadoop-mr-bundle/pom.xml +++ b/packaging/hoodie-hadoop-mr-bundle/pom.xml @@ -200,6 +200,18 @@ org.apache.commons com.uber.hoodie.org.apache.commons + + com.esotericsoftware.kryo. + com.uber.hoodie.com.esotericsoftware.kryo. + + + org.objenesis. + com.uber.hoodie.org.objenesis. + + + com.esotericsoftware.minlog. + com.uber.hoodie.com.esotericsoftware.minlog. + false @@ -211,6 +223,9 @@ com.twitter.common:objectsize commons-logging:commons-logging commons-io:commons-io + com.esotericsoftware:kryo-shaded + org.objenesis:objenesis + com.esotericsoftware:minlog ${project.artifactId}-${project.version} diff --git a/packaging/hoodie-presto-bundle/pom.xml b/packaging/hoodie-presto-bundle/pom.xml index 750fba9a8..8405b9049 100644 --- a/packaging/hoodie-presto-bundle/pom.xml +++ b/packaging/hoodie-presto-bundle/pom.xml @@ -161,6 +161,18 @@ parquet.schema. com.uber.hoodie.parquet.schema. + + com.esotericsoftware.kryo. + com.uber.hoodie.com.esotericsoftware.kryo. + + + org.objenesis. + com.uber.hoodie.org.objenesis. + + + com.esotericsoftware.minlog. + com.uber.hoodie.com.esotericsoftware.minlog. + false diff --git a/packaging/hoodie-spark-bundle/pom.xml b/packaging/hoodie-spark-bundle/pom.xml index f3b45f498..f1e3ceed6 100644 --- a/packaging/hoodie-spark-bundle/pom.xml +++ b/packaging/hoodie-spark-bundle/pom.xml @@ -148,6 +148,18 @@ org.apache.hadoop.hive.service. com.uber.hoodie.org.apache.hadoop_hive.service. + + com.esotericsoftware.kryo. + com.uber.hoodie.com.esotericsoftware.kryo. + + + org.objenesis. + com.uber.hoodie.org.objenesis. + + + com.esotericsoftware.minlog. + com.uber.hoodie.com.esotericsoftware.minlog. + false