|
|
|
|
@@ -16,145 +16,55 @@
|
|
|
|
|
|
|
|
|
|
package com.uber.hoodie.common.util;
|
|
|
|
|
|
|
|
|
|
import com.esotericsoftware.kryo.Kryo;
|
|
|
|
|
import com.esotericsoftware.kryo.Serializer;
|
|
|
|
|
import com.esotericsoftware.kryo.io.Input;
|
|
|
|
|
import com.esotericsoftware.kryo.io.Output;
|
|
|
|
|
import com.esotericsoftware.kryo.serializers.FieldSerializer;
|
|
|
|
|
import com.esotericsoftware.reflectasm.ConstructorAccess;
|
|
|
|
|
import com.uber.hoodie.exception.HoodieSerializationException;
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.io.InputStream;
|
|
|
|
|
import java.io.ObjectInputStream;
|
|
|
|
|
import java.io.ObjectOutputStream;
|
|
|
|
|
import java.io.OutputStream;
|
|
|
|
|
import java.io.Serializable;
|
|
|
|
|
import java.lang.reflect.Constructor;
|
|
|
|
|
import java.lang.reflect.InvocationTargetException;
|
|
|
|
|
import org.objenesis.instantiator.ObjectInstantiator;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* (NOTE: Adapted from Apache commons-lang3)
|
|
|
|
|
* This class defines API's to serde an object.
|
|
|
|
|
* {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing /
|
|
|
|
|
* deserializing objects.
|
|
|
|
|
*/
|
|
|
|
|
public class SerializationUtils {
|
|
|
|
|
|
|
|
|
|
// Caching kryo serializer to avoid creating kryo instance for every serde operation
|
|
|
|
|
private static final ThreadLocal<KryoSerializerInstance> serializerRef =
|
|
|
|
|
ThreadLocal.withInitial(() -> new KryoSerializerInstance());
|
|
|
|
|
|
|
|
|
|
// Serialize
|
|
|
|
|
//-----------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* <p>Serializes an {@code Object} to the specified stream.</p>
|
|
|
|
|
*
|
|
|
|
|
* <p>The stream will be closed once the object is written.
|
|
|
|
|
* This avoids the need for a finally clause, and maybe also exception
|
|
|
|
|
* handling, in the application code.</p>
|
|
|
|
|
*
|
|
|
|
|
* <p>The stream passed in is not buffered internally within this method.
|
|
|
|
|
* This is the responsibility of your application if desired.</p>
|
|
|
|
|
*
|
|
|
|
|
* @param obj the object to serialize to bytes, may be null
|
|
|
|
|
* @param outputStream the stream to write to, must not be null
|
|
|
|
|
* @throws IllegalArgumentException if {@code outputStream} is {@code null}
|
|
|
|
|
* @throws HoodieSerializationException (runtime) if the serialization fails
|
|
|
|
|
*/
|
|
|
|
|
public static void serialize(final Serializable obj, final OutputStream outputStream) {
|
|
|
|
|
if (outputStream == null) {
|
|
|
|
|
throw new IllegalArgumentException("The OutputStream must not be null");
|
|
|
|
|
}
|
|
|
|
|
ObjectOutputStream out = null;
|
|
|
|
|
try {
|
|
|
|
|
// stream closed in the finally
|
|
|
|
|
out = new ObjectOutputStream(outputStream);
|
|
|
|
|
out.writeObject(obj);
|
|
|
|
|
|
|
|
|
|
} catch (final IOException ex) {
|
|
|
|
|
throw new HoodieSerializationException("unable to serialize object", ex);
|
|
|
|
|
} finally {
|
|
|
|
|
try {
|
|
|
|
|
if (out != null) {
|
|
|
|
|
out.close();
|
|
|
|
|
}
|
|
|
|
|
} catch (final IOException ex) { // NOPMD
|
|
|
|
|
// ignore close exception
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* <p>Serializes an {@code Object} to a byte array for
|
|
|
|
|
* storage/serialization.</p>
|
|
|
|
|
* <p>Serializes an {@code Object} to a byte array for storage/serialization.</p>
|
|
|
|
|
*
|
|
|
|
|
* @param obj the object to serialize to bytes
|
|
|
|
|
* @return a byte[] with the converted Serializable
|
|
|
|
|
* @throws HoodieSerializationException (runtime) if the serialization fails
|
|
|
|
|
* @throws IOException if the serialization fails
|
|
|
|
|
*/
|
|
|
|
|
public static byte[] serialize(final Serializable obj) {
|
|
|
|
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
|
|
|
|
|
serialize(obj, baos);
|
|
|
|
|
return baos.toByteArray();
|
|
|
|
|
public static byte[] serialize(final Object obj) throws IOException {
|
|
|
|
|
return serializerRef.get().serialize(obj);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Deserialize
|
|
|
|
|
//-----------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* <p>
|
|
|
|
|
* Deserializes an {@code Object} from the specified stream.
|
|
|
|
|
* </p>
|
|
|
|
|
* <p> Deserializes a single {@code Object} from an array of bytes. </p>
|
|
|
|
|
*
|
|
|
|
|
* <p>
|
|
|
|
|
* The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also
|
|
|
|
|
* exception handling, in the application code.
|
|
|
|
|
* </p>
|
|
|
|
|
*
|
|
|
|
|
* <p>
|
|
|
|
|
* The stream passed in is not buffered internally within this method. This is the responsibility of your
|
|
|
|
|
* application if desired.
|
|
|
|
|
* </p>
|
|
|
|
|
*
|
|
|
|
|
* <p>
|
|
|
|
|
* If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
|
|
|
|
|
* Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
|
|
|
|
|
* Note that in both cases, the ClassCastException is in the call site, not in this method.
|
|
|
|
|
* </p>
|
|
|
|
|
*
|
|
|
|
|
* @param <T> the object type to be deserialized
|
|
|
|
|
* @param inputStream the serialized object input stream, must not be null
|
|
|
|
|
* @return the deserialized object
|
|
|
|
|
* @throws IllegalArgumentException if {@code inputStream} is {@code null}
|
|
|
|
|
* @throws HoodieSerializationException (runtime) if the serialization fails
|
|
|
|
|
*/
|
|
|
|
|
public static <T> T deserialize(final InputStream inputStream) {
|
|
|
|
|
if (inputStream == null) {
|
|
|
|
|
throw new IllegalArgumentException("The InputStream must not be null");
|
|
|
|
|
}
|
|
|
|
|
ObjectInputStream in = null;
|
|
|
|
|
try {
|
|
|
|
|
// stream closed in the finally
|
|
|
|
|
in = new ObjectInputStream(inputStream);
|
|
|
|
|
@SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect
|
|
|
|
|
final T obj = (T) in.readObject();
|
|
|
|
|
return obj;
|
|
|
|
|
|
|
|
|
|
} catch (final ClassCastException ex) {
|
|
|
|
|
throw new HoodieSerializationException("cannot cast class", ex);
|
|
|
|
|
} catch (final ClassNotFoundException ex) {
|
|
|
|
|
throw new HoodieSerializationException("class not found", ex);
|
|
|
|
|
} catch (final IOException ex) {
|
|
|
|
|
throw new HoodieSerializationException("unable to deserialize to object", ex);
|
|
|
|
|
} finally {
|
|
|
|
|
try {
|
|
|
|
|
if (in != null) {
|
|
|
|
|
in.close();
|
|
|
|
|
}
|
|
|
|
|
} catch (final IOException ex) { // NOPMD
|
|
|
|
|
// ignore close exception
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* <p>
|
|
|
|
|
* Deserializes a single {@code Object} from an array of bytes.
|
|
|
|
|
* </p>
|
|
|
|
|
*
|
|
|
|
|
* <p>
|
|
|
|
|
* If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
|
|
|
|
|
* Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
|
|
|
|
|
* Note that in both cases, the ClassCastException is in the call site, not in this method.
|
|
|
|
|
* </p>
|
|
|
|
|
* <p> If the call site incorrectly types the return value, a {@link ClassCastException} is thrown
|
|
|
|
|
* from the call site. Without Generics in this declaration, the call site must type cast and can
|
|
|
|
|
* cause the same ClassCastException. Note that in both cases, the ClassCastException is in the
|
|
|
|
|
* call site, not in this method. </p>
|
|
|
|
|
*
|
|
|
|
|
* @param <T> the object type to be deserialized
|
|
|
|
|
* @param objectData the serialized object, must not be null
|
|
|
|
|
@@ -166,6 +76,88 @@ public class SerializationUtils {
|
|
|
|
|
if (objectData == null) {
|
|
|
|
|
throw new IllegalArgumentException("The byte[] must not be null");
|
|
|
|
|
}
|
|
|
|
|
return deserialize(new ByteArrayInputStream(objectData));
|
|
|
|
|
return (T) serializerRef.get().deserialize(objectData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class KryoSerializerInstance implements Serializable {
|
|
|
|
|
public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
|
|
|
|
|
private final Kryo kryo;
|
|
|
|
|
// Caching ByteArrayOutputStream to avoid recreating it for every operation
|
|
|
|
|
private final ByteArrayOutputStream baos;
|
|
|
|
|
|
|
|
|
|
KryoSerializerInstance() {
|
|
|
|
|
KryoInstantiator kryoInstantiator = new KryoInstantiator();
|
|
|
|
|
kryo = kryoInstantiator.newKryo();
|
|
|
|
|
baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
|
|
|
|
|
kryo.setRegistrationRequired(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
byte[] serialize(Object obj) throws IOException {
|
|
|
|
|
kryo.reset();
|
|
|
|
|
baos.reset();
|
|
|
|
|
Output output = new Output(baos);
|
|
|
|
|
this.kryo.writeClassAndObject(output, obj);
|
|
|
|
|
output.close();
|
|
|
|
|
return baos.toByteArray();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Object deserialize(byte[] objectData) {
|
|
|
|
|
return this.kryo.readClassAndObject(new Input(objectData));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This class has a no-arg constructor, suitable for use with reflection instantiation.
|
|
|
|
|
* For Details checkout com.twitter.chill.KryoBase.
|
|
|
|
|
*/
|
|
|
|
|
private static class KryoInstantiator implements Serializable {
|
|
|
|
|
|
|
|
|
|
public Kryo newKryo() {
|
|
|
|
|
|
|
|
|
|
Kryo kryo = new KryoBase();
|
|
|
|
|
// ensure that kryo doesn't fail if classes are not registered with kryo.
|
|
|
|
|
kryo.setRegistrationRequired(false);
|
|
|
|
|
// This would be used for object initialization if nothing else works out.
|
|
|
|
|
kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
|
|
|
|
|
// Handle cases where we may have an odd classloader setup like with libjars
|
|
|
|
|
// for hadoop
|
|
|
|
|
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
|
|
|
|
|
return kryo;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class KryoBase extends Kryo {
|
|
|
|
|
@Override
|
|
|
|
|
protected Serializer newDefaultSerializer(Class type) {
|
|
|
|
|
final Serializer serializer = super.newDefaultSerializer(type);
|
|
|
|
|
if (serializer instanceof FieldSerializer) {
|
|
|
|
|
final FieldSerializer fieldSerializer = (FieldSerializer) serializer;
|
|
|
|
|
fieldSerializer.setIgnoreSyntheticFields(true);
|
|
|
|
|
}
|
|
|
|
|
return serializer;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected ObjectInstantiator newInstantiator(Class type) {
|
|
|
|
|
return () -> {
|
|
|
|
|
// First try reflectasm - it is fastest way to instantiate an object.
|
|
|
|
|
try {
|
|
|
|
|
final ConstructorAccess access = ConstructorAccess.get(type);
|
|
|
|
|
return access.newInstance();
|
|
|
|
|
} catch (Throwable t) {
|
|
|
|
|
// ignore this exception. We may want to try other way.
|
|
|
|
|
}
|
|
|
|
|
// fall back to java based instantiation.
|
|
|
|
|
try {
|
|
|
|
|
final Constructor constructor = type.getConstructor();
|
|
|
|
|
constructor.setAccessible(true);
|
|
|
|
|
return constructor.newInstance();
|
|
|
|
|
} catch (NoSuchMethodException | IllegalAccessException
|
|
|
|
|
| InstantiationException | InvocationTargetException e) {
|
|
|
|
|
// ignore this exception. we will fall back to default instantiation strategy.
|
|
|
|
|
}
|
|
|
|
|
return super.getInstantiatorStrategy().newInstantiatorOf(type).newInstance();
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|