Revert "Replacing Apache commons-lang3 object serializer with Kryo serializer"
This reverts commit a6c45feb2c.
This commit is contained in:
committed by
Balaji Varadarajan
parent
9e7ce19b06
commit
e35d24f31d
@@ -85,7 +85,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
|
||||
int dataLength = dis.readInt();
|
||||
byte[] data = new byte[dataLength];
|
||||
dis.readFully(data);
|
||||
this.keysToDelete = SerializationUtils.<HoodieKey[]>deserialize(data);
|
||||
this.keysToDelete = SerializationUtils.deserialize(data);
|
||||
deflate();
|
||||
}
|
||||
return keysToDelete;
|
||||
|
||||
@@ -16,50 +16,145 @@
|
||||
|
||||
package com.uber.hoodie.common.util;
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo;
|
||||
import com.esotericsoftware.kryo.io.Input;
|
||||
import com.esotericsoftware.kryo.io.Output;
|
||||
import com.twitter.chill.EmptyScalaKryoInstantiator;
|
||||
import com.uber.hoodie.exception.HoodieSerializationException;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Serializable;
|
||||
|
||||
|
||||
/**
|
||||
* {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing /
|
||||
* deserializing objects.
|
||||
* (NOTE: Adapted from Apache commons-lang3)
|
||||
* This class defines API's to serde an object.
|
||||
*/
|
||||
public class SerializationUtils {
|
||||
|
||||
// Caching kryo serializer to avoid creating kryo instance for every serde operation
|
||||
private static final ThreadLocal<KryoSerializerInstance> serializerRef =
|
||||
ThreadLocal.withInitial(() -> new KryoSerializerInstance());
|
||||
|
||||
// Serialize
|
||||
//-----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* <p>Serializes an {@code Object} to a byte array for storage/serialization.</p>
|
||||
* <p>Serializes an {@code Object} to the specified stream.</p>
|
||||
*
|
||||
* <p>The stream will be closed once the object is written.
|
||||
* This avoids the need for a finally clause, and maybe also exception
|
||||
* handling, in the application code.</p>
|
||||
*
|
||||
* <p>The stream passed in is not buffered internally within this method.
|
||||
* This is the responsibility of your application if desired.</p>
|
||||
*
|
||||
* @param obj the object to serialize to bytes, may be null
|
||||
* @param outputStream the stream to write to, must not be null
|
||||
* @throws IllegalArgumentException if {@code outputStream} is {@code null}
|
||||
* @throws HoodieSerializationException (runtime) if the serialization fails
|
||||
*/
|
||||
public static void serialize(final Serializable obj, final OutputStream outputStream) {
|
||||
if (outputStream == null) {
|
||||
throw new IllegalArgumentException("The OutputStream must not be null");
|
||||
}
|
||||
ObjectOutputStream out = null;
|
||||
try {
|
||||
// stream closed in the finally
|
||||
out = new ObjectOutputStream(outputStream);
|
||||
out.writeObject(obj);
|
||||
|
||||
} catch (final IOException ex) {
|
||||
throw new HoodieSerializationException("unable to serialize object", ex);
|
||||
} finally {
|
||||
try {
|
||||
if (out != null) {
|
||||
out.close();
|
||||
}
|
||||
} catch (final IOException ex) { // NOPMD
|
||||
// ignore close exception
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Serializes an {@code Object} to a byte array for
|
||||
* storage/serialization.</p>
|
||||
*
|
||||
* @param obj the object to serialize to bytes
|
||||
* @return a byte[] with the converted Serializable
|
||||
* @throws IOException if the serialization fails
|
||||
* @throws HoodieSerializationException (runtime) if the serialization fails
|
||||
*/
|
||||
public static byte[] serialize(final Object obj) throws IOException {
|
||||
return serializerRef.get().serialize(obj);
|
||||
public static byte[] serialize(final Serializable obj) {
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
|
||||
serialize(obj, baos);
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
// Deserialize
|
||||
//-----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* <p> Deserializes a single {@code Object} from an array of bytes. </p>
|
||||
* <p>
|
||||
* Deserializes an {@code Object} from the specified stream.
|
||||
* </p>
|
||||
*
|
||||
* <p> If the call site incorrectly types the return value, a {@link ClassCastException} is thrown
|
||||
* from the call site. Without Generics in this declaration, the call site must type cast and can
|
||||
* cause the same ClassCastException. Note that in both cases, the ClassCastException is in the
|
||||
* call site, not in this method. </p>
|
||||
* <p>
|
||||
* The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also
|
||||
* exception handling, in the application code.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* The stream passed in is not buffered internally within this method. This is the responsibility of your
|
||||
* application if desired.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
|
||||
* Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
|
||||
* Note that in both cases, the ClassCastException is in the call site, not in this method.
|
||||
* </p>
|
||||
*
|
||||
* @param <T> the object type to be deserialized
|
||||
* @param inputStream the serialized object input stream, must not be null
|
||||
* @return the deserialized object
|
||||
* @throws IllegalArgumentException if {@code inputStream} is {@code null}
|
||||
* @throws HoodieSerializationException (runtime) if the serialization fails
|
||||
*/
|
||||
public static <T> T deserialize(final InputStream inputStream) {
|
||||
if (inputStream == null) {
|
||||
throw new IllegalArgumentException("The InputStream must not be null");
|
||||
}
|
||||
ObjectInputStream in = null;
|
||||
try {
|
||||
// stream closed in the finally
|
||||
in = new ObjectInputStream(inputStream);
|
||||
@SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect
|
||||
final T obj = (T) in.readObject();
|
||||
return obj;
|
||||
|
||||
} catch (final ClassCastException ex) {
|
||||
throw new HoodieSerializationException("cannot cast class", ex);
|
||||
} catch (final ClassNotFoundException ex) {
|
||||
throw new HoodieSerializationException("class not found", ex);
|
||||
} catch (final IOException ex) {
|
||||
throw new HoodieSerializationException("unable to deserialize to object", ex);
|
||||
} finally {
|
||||
try {
|
||||
if (in != null) {
|
||||
in.close();
|
||||
}
|
||||
} catch (final IOException ex) { // NOPMD
|
||||
// ignore close exception
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Deserializes a single {@code Object} from an array of bytes.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
|
||||
* Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
|
||||
* Note that in both cases, the ClassCastException is in the call site, not in this method.
|
||||
* </p>
|
||||
*
|
||||
* @param <T> the object type to be deserialized
|
||||
* @param objectData the serialized object, must not be null
|
||||
@@ -71,33 +166,6 @@ public class SerializationUtils {
|
||||
if (objectData == null) {
|
||||
throw new IllegalArgumentException("The byte[] must not be null");
|
||||
}
|
||||
return (T) serializerRef.get().deserialize(objectData);
|
||||
}
|
||||
|
||||
private static class KryoSerializerInstance implements Serializable {
|
||||
public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
|
||||
private final Kryo kryo;
|
||||
// Caching ByteArrayOutputStream to avoid recreating it for every operation
|
||||
private final ByteArrayOutputStream baos;
|
||||
|
||||
KryoSerializerInstance() {
|
||||
EmptyScalaKryoInstantiator kryoInstantiator = new EmptyScalaKryoInstantiator();
|
||||
kryo = kryoInstantiator.newKryo();
|
||||
baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
|
||||
kryo.setRegistrationRequired(false);
|
||||
}
|
||||
|
||||
byte[] serialize(Object obj) throws IOException {
|
||||
kryo.reset();
|
||||
baos.reset();
|
||||
Output output = new Output(baos);
|
||||
this.kryo.writeClassAndObject(output, obj);
|
||||
output.close();
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
Object deserialize(byte[] objectData) {
|
||||
return this.kryo.readClassAndObject(new Input(objectData));
|
||||
}
|
||||
return deserialize(new ByteArrayInputStream(objectData));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,8 +156,8 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return SerializationUtils.<R>deserialize(SpillableMapUtils
|
||||
.readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue()));
|
||||
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
|
||||
entry.getOffsetOfValue(), entry.getSizeOfValue()));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
|
||||
}
|
||||
|
||||
@@ -81,9 +81,8 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
|
||||
public R next() {
|
||||
Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
|
||||
try {
|
||||
return SerializationUtils.<R>deserialize(SpillableMapUtils
|
||||
.readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(),
|
||||
entry.getValue().getSizeOfValue()));
|
||||
return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
|
||||
entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user