serializerRef =
+ ThreadLocal.withInitial(() -> new KryoSerializerInstance());
+
// Serialize
//-----------------------------------------------------------------------
/**
- * Serializes an {@code Object} to the specified stream.
- *
- * The stream will be closed once the object is written.
- * This avoids the need for a finally clause, and maybe also exception
- * handling, in the application code.
- *
- * The stream passed in is not buffered internally within this method.
- * This is the responsibility of your application if desired.
- *
- * @param obj the object to serialize to bytes, may be null
- * @param outputStream the stream to write to, must not be null
- * @throws IllegalArgumentException if {@code outputStream} is {@code null}
- * @throws HoodieSerializationException (runtime) if the serialization fails
- */
- public static void serialize(final Serializable obj, final OutputStream outputStream) {
- if (outputStream == null) {
- throw new IllegalArgumentException("The OutputStream must not be null");
- }
- ObjectOutputStream out = null;
- try {
- // stream closed in the finally
- out = new ObjectOutputStream(outputStream);
- out.writeObject(obj);
-
- } catch (final IOException ex) {
- throw new HoodieSerializationException("unable to serialize object", ex);
- } finally {
- try {
- if (out != null) {
- out.close();
- }
- } catch (final IOException ex) { // NOPMD
- // ignore close exception
- }
- }
- }
-
- /**
- * Serializes an {@code Object} to a byte array for
- * storage/serialization.
+ * Serializes an {@code Object} to a byte array for storage/serialization.
*
* @param obj the object to serialize to bytes
* @return a byte[] with the converted Serializable
- * @throws HoodieSerializationException (runtime) if the serialization fails
+ * @throws IOException if the serialization fails
*/
- public static byte[] serialize(final Serializable obj) {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- serialize(obj, baos);
- return baos.toByteArray();
+ public static byte[] serialize(final Object obj) throws IOException {
+ return serializerRef.get().serialize(obj);
}
// Deserialize
//-----------------------------------------------------------------------
/**
- *
- * Deserializes an {@code Object} from the specified stream.
- *
+ * Deserializes a single {@code Object} from an array of bytes.
*
- *
- * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also
- * exception handling, in the application code.
- *
- *
- *
- * The stream passed in is not buffered internally within this method. This is the responsibility of your
- * application if desired.
- *
- *
- *
- * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
- * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
- * Note that in both cases, the ClassCastException is in the call site, not in this method.
- *
- *
- * @param the object type to be deserialized
- * @param inputStream the serialized object input stream, must not be null
- * @return the deserialized object
- * @throws IllegalArgumentException if {@code inputStream} is {@code null}
- * @throws HoodieSerializationException (runtime) if the serialization fails
- */
- public static T deserialize(final InputStream inputStream) {
- if (inputStream == null) {
- throw new IllegalArgumentException("The InputStream must not be null");
- }
- ObjectInputStream in = null;
- try {
- // stream closed in the finally
- in = new ObjectInputStream(inputStream);
- @SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect
- final T obj = (T) in.readObject();
- return obj;
-
- } catch (final ClassCastException ex) {
- throw new HoodieSerializationException("cannot cast class", ex);
- } catch (final ClassNotFoundException ex) {
- throw new HoodieSerializationException("class not found", ex);
- } catch (final IOException ex) {
- throw new HoodieSerializationException("unable to deserialize to object", ex);
- } finally {
- try {
- if (in != null) {
- in.close();
- }
- } catch (final IOException ex) { // NOPMD
- // ignore close exception
- }
- }
- }
-
- /**
- *
- * Deserializes a single {@code Object} from an array of bytes.
- *
- *
- *
- * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
- * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
- * Note that in both cases, the ClassCastException is in the call site, not in this method.
- *
+ * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown
+ * from the call site. Without Generics in this declaration, the call site must type cast and can
+ * cause the same ClassCastException. Note that in both cases, the ClassCastException is in the
+ * call site, not in this method.
*
* @param the object type to be deserialized
* @param objectData the serialized object, must not be null
@@ -166,6 +76,88 @@ public class SerializationUtils {
if (objectData == null) {
throw new IllegalArgumentException("The byte[] must not be null");
}
- return deserialize(new ByteArrayInputStream(objectData));
+ return (T) serializerRef.get().deserialize(objectData);
+ }
+
+ private static class KryoSerializerInstance implements Serializable {
+ public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
+ private final Kryo kryo;
+ // Caching ByteArrayOutputStream to avoid recreating it for every operation
+ private final ByteArrayOutputStream baos;
+
+ KryoSerializerInstance() {
+ KryoInstantiator kryoInstantiator = new KryoInstantiator();
+ kryo = kryoInstantiator.newKryo();
+ baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
+ kryo.setRegistrationRequired(false);
+ }
+
+ byte[] serialize(Object obj) throws IOException {
+ kryo.reset();
+ baos.reset();
+ Output output = new Output(baos);
+ this.kryo.writeClassAndObject(output, obj);
+ output.close();
+ return baos.toByteArray();
+ }
+
+ Object deserialize(byte[] objectData) {
+ return this.kryo.readClassAndObject(new Input(objectData));
+ }
+ }
+
+ /**
+ * This class has a no-arg constructor, suitable for use with reflection instantiation.
+ * For Details checkout com.twitter.chill.KryoBase.
+ */
+ private static class KryoInstantiator implements Serializable {
+
+ public Kryo newKryo() {
+
+ Kryo kryo = new KryoBase();
+ // ensure that kryo doesn't fail if classes are not registered with kryo.
+ kryo.setRegistrationRequired(false);
+ // This would be used for object initialization if nothing else works out.
+ kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
+ // Handle cases where we may have an odd classloader setup like with libjars
+ // for hadoop
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return kryo;
+ }
+
+ private static class KryoBase extends Kryo {
+ @Override
+ protected Serializer newDefaultSerializer(Class type) {
+ final Serializer serializer = super.newDefaultSerializer(type);
+ if (serializer instanceof FieldSerializer) {
+ final FieldSerializer fieldSerializer = (FieldSerializer) serializer;
+ fieldSerializer.setIgnoreSyntheticFields(true);
+ }
+ return serializer;
+ }
+
+ @Override
+ protected ObjectInstantiator newInstantiator(Class type) {
+ return () -> {
+ // First try reflectasm - it is fastest way to instantiate an object.
+ try {
+ final ConstructorAccess access = ConstructorAccess.get(type);
+ return access.newInstance();
+ } catch (Throwable t) {
+ // ignore this exception. We may want to try other way.
+ }
+ // fall back to java based instantiation.
+ try {
+ final Constructor constructor = type.getConstructor();
+ constructor.setAccessible(true);
+ return constructor.newInstance();
+ } catch (NoSuchMethodException | IllegalAccessException
+ | InstantiationException | InvocationTargetException e) {
+ // ignore this exception. we will fall back to default instantiation strategy.
+ }
+ return super.getInstantiatorStrategy().newInstantiatorOf(type).newInstance();
+ };
+ }
+ }
}
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
index 047b99796..9fd0091fa 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
@@ -156,8 +156,8 @@ public final class DiskBasedMap
return null;
}
try {
- return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
- entry.getOffsetOfValue(), entry.getSizeOfValue()));
+ return SerializationUtils.deserialize(SpillableMapUtils
+ .readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
index c2d74e0bd..08aa78494 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
@@ -81,8 +81,9 @@ public class LazyFileIterable implements Iterable {
public R next() {
Map.Entry entry = this.metadataIterator.next();
try {
- return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
- entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()));
+ return SerializationUtils.deserialize(SpillableMapUtils
+ .readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(),
+ entry.getValue().getSizeOfValue()));
} catch (IOException e) {
throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);
}
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java
new file mode 100644
index 000000000..f4a45b04b
--- /dev/null
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Objects;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSerializationUtils {
+
+ @Test
+ public void testSerDeser() throws IOException {
+ // It should handle null object references.
+ verifyObject(null);
+ // Object with nulls.
+ verifyObject(new NonSerializableClass(null));
+ // Object with valid values & no default constructor.
+ verifyObject(new NonSerializableClass("testValue"));
+ // Object with multiple constructor
+ verifyObject(new NonSerializableClass("testValue1", "testValue2"));
+ // Object which is of non-serializable class.
+ verifyObject(new Utf8("test-key"));
+ // Verify serialization of list.
+ verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
+ }
+
+ private void verifyObject(T expectedValue) throws IOException {
+ byte[] serializedObject = SerializationUtils.serialize(expectedValue);
+ Assert.assertTrue(serializedObject != null && serializedObject.length > 0);
+
+ final T deserializedValue = SerializationUtils.deserialize(serializedObject);
+ if (expectedValue == null) {
+ Assert.assertNull(deserializedValue);
+ } else {
+ Assert.assertTrue(expectedValue.equals(deserializedValue));
+ }
+ }
+
+ private static class NonSerializableClass {
+ private String id;
+ private String name;
+
+ NonSerializableClass(String id) {
+ this(id, "");
+ }
+
+ NonSerializableClass(String id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof NonSerializableClass)) {
+ return false;
+ }
+ final NonSerializableClass other = (NonSerializableClass) obj;
+ return Objects.equals(this.id, other.id) && Objects.equals(this.name, other.name);
+ }
+ }
+}
diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml
index c95950a4d..00b853df4 100644
--- a/hoodie-utilities/pom.xml
+++ b/hoodie-utilities/pom.xml
@@ -81,6 +81,9 @@
org.apache.hive:hive-service
org.apache.hive:hive-metastore
org.apache.hive:hive-jdbc
+ com.esotericsoftware:kryo-shaded
+ org.objenesis:objenesis
+ com.esotericsoftware:minlog
@@ -120,6 +123,18 @@
org.apache.hadoop.hive.service.
com.uber.hoodie.org.apache.hadoop_hive.service.
+
+ com.esotericsoftware.kryo.
+ com.uber.hoodie.com.esotericsoftware.kryo.
+
+
+ org.objenesis.
+ com.uber.hoodie.org.objenesis.
+
+
+ com.esotericsoftware.minlog.
+ com.uber.hoodie.com.esotericsoftware.minlog.
+
diff --git a/packaging/hoodie-hadoop-mr-bundle/pom.xml b/packaging/hoodie-hadoop-mr-bundle/pom.xml
index b25cc2918..f25d0f7e6 100644
--- a/packaging/hoodie-hadoop-mr-bundle/pom.xml
+++ b/packaging/hoodie-hadoop-mr-bundle/pom.xml
@@ -200,6 +200,18 @@
org.apache.commons
com.uber.hoodie.org.apache.commons
+
+ com.esotericsoftware.kryo.
+ com.uber.hoodie.com.esotericsoftware.kryo.
+
+
+ org.objenesis.
+ com.uber.hoodie.org.objenesis.
+
+
+ com.esotericsoftware.minlog.
+ com.uber.hoodie.com.esotericsoftware.minlog.
+
false
@@ -211,6 +223,9 @@
com.twitter.common:objectsize
commons-logging:commons-logging
commons-io:commons-io
+ com.esotericsoftware:kryo-shaded
+ org.objenesis:objenesis
+ com.esotericsoftware:minlog
${project.artifactId}-${project.version}
diff --git a/packaging/hoodie-presto-bundle/pom.xml b/packaging/hoodie-presto-bundle/pom.xml
index 750fba9a8..8405b9049 100644
--- a/packaging/hoodie-presto-bundle/pom.xml
+++ b/packaging/hoodie-presto-bundle/pom.xml
@@ -161,6 +161,18 @@
parquet.schema.
com.uber.hoodie.parquet.schema.
+
+ com.esotericsoftware.kryo.
+ com.uber.hoodie.com.esotericsoftware.kryo.
+
+
+ org.objenesis.
+ com.uber.hoodie.org.objenesis.
+
+
+ com.esotericsoftware.minlog.
+ com.uber.hoodie.com.esotericsoftware.minlog.
+
false
diff --git a/packaging/hoodie-spark-bundle/pom.xml b/packaging/hoodie-spark-bundle/pom.xml
index f3b45f498..f1e3ceed6 100644
--- a/packaging/hoodie-spark-bundle/pom.xml
+++ b/packaging/hoodie-spark-bundle/pom.xml
@@ -148,6 +148,18 @@
org.apache.hadoop.hive.service.
com.uber.hoodie.org.apache.hadoop_hive.service.
+
+ com.esotericsoftware.kryo.
+ com.uber.hoodie.com.esotericsoftware.kryo.
+
+
+ org.objenesis.
+ com.uber.hoodie.org.objenesis.
+
+
+ com.esotericsoftware.minlog.
+ com.uber.hoodie.com.esotericsoftware.minlog.
+
false