diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index 88274265f..912f68d24 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -146,10 +146,5 @@ objectsize 0.0.12 - - com.twitter - chill_2.11 - 0.8.0 - diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java index a31b51279..9ee4f296b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java @@ -85,7 +85,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { int dataLength = dis.readInt(); byte[] data = new byte[dataLength]; dis.readFully(data); - this.keysToDelete = SerializationUtils.deserialize(data); + this.keysToDelete = SerializationUtils.deserialize(data); deflate(); } return keysToDelete; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java index ca293942d..c52a66674 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java @@ -16,50 +16,145 @@ package com.uber.hoodie.common.util; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.twitter.chill.EmptyScalaKryoInstantiator; import com.uber.hoodie.exception.HoodieSerializationException; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; import java.io.Serializable; - /** - * {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing / - * deserializing objects. + * (NOTE: Adapted from Apache commons-lang3) + * This class defines API's to serde an object. */ public class SerializationUtils { - - // Caching kryo serializer to avoid creating kryo instance for every serde operation - private static final ThreadLocal serializerRef = - ThreadLocal.withInitial(() -> new KryoSerializerInstance()); - // Serialize //----------------------------------------------------------------------- /** - *

Serializes an {@code Object} to a byte array for storage/serialization.

+ *

Serializes an {@code Object} to the specified stream.

+ * + *

The stream will be closed once the object is written. + * This avoids the need for a finally clause, and maybe also exception + * handling, in the application code.

+ * + *

The stream passed in is not buffered internally within this method. + * This is the responsibility of your application if desired.

+ * + * @param obj the object to serialize to bytes, may be null + * @param outputStream the stream to write to, must not be null + * @throws IllegalArgumentException if {@code outputStream} is {@code null} + * @throws HoodieSerializationException (runtime) if the serialization fails + */ + public static void serialize(final Serializable obj, final OutputStream outputStream) { + if (outputStream == null) { + throw new IllegalArgumentException("The OutputStream must not be null"); + } + ObjectOutputStream out = null; + try { + // stream closed in the finally + out = new ObjectOutputStream(outputStream); + out.writeObject(obj); + + } catch (final IOException ex) { + throw new HoodieSerializationException("unable to serialize object", ex); + } finally { + try { + if (out != null) { + out.close(); + } + } catch (final IOException ex) { // NOPMD + // ignore close exception + } + } + } + + /** + *

Serializes an {@code Object} to a byte array for + * storage/serialization.

* * @param obj the object to serialize to bytes * @return a byte[] with the converted Serializable - * @throws IOException if the serialization fails + * @throws HoodieSerializationException (runtime) if the serialization fails */ - public static byte[] serialize(final Object obj) throws IOException { - return serializerRef.get().serialize(obj); + public static byte[] serialize(final Serializable obj) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + serialize(obj, baos); + return baos.toByteArray(); } // Deserialize //----------------------------------------------------------------------- /** - *

Deserializes a single {@code Object} from an array of bytes.

+ *

+ * Deserializes an {@code Object} from the specified stream. + *

* - *

If the call site incorrectly types the return value, a {@link ClassCastException} is thrown - * from the call site. Without Generics in this declaration, the call site must type cast and can - * cause the same ClassCastException. Note that in both cases, the ClassCastException is in the - * call site, not in this method.

+ *

+ * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also + * exception handling, in the application code. + *

+ * + *

+ * The stream passed in is not buffered internally within this method. This is the responsibility of your + * application if desired. + *

+ * + *

+ * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site. + * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException. + * Note that in both cases, the ClassCastException is in the call site, not in this method. + *

+ * + * @param the object type to be deserialized + * @param inputStream the serialized object input stream, must not be null + * @return the deserialized object + * @throws IllegalArgumentException if {@code inputStream} is {@code null} + * @throws HoodieSerializationException (runtime) if the serialization fails + */ + public static T deserialize(final InputStream inputStream) { + if (inputStream == null) { + throw new IllegalArgumentException("The InputStream must not be null"); + } + ObjectInputStream in = null; + try { + // stream closed in the finally + in = new ObjectInputStream(inputStream); + @SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect + final T obj = (T) in.readObject(); + return obj; + + } catch (final ClassCastException ex) { + throw new HoodieSerializationException("cannot cast class", ex); + } catch (final ClassNotFoundException ex) { + throw new HoodieSerializationException("class not found", ex); + } catch (final IOException ex) { + throw new HoodieSerializationException("unable to deserialize to object", ex); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (final IOException ex) { // NOPMD + // ignore close exception + } + } + } + + /** + *

+ * Deserializes a single {@code Object} from an array of bytes. + *

+ * + *

+ * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site. + * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException. + * Note that in both cases, the ClassCastException is in the call site, not in this method. + *

* * @param the object type to be deserialized * @param objectData the serialized object, must not be null @@ -71,33 +166,6 @@ public class SerializationUtils { if (objectData == null) { throw new IllegalArgumentException("The byte[] must not be null"); } - return (T) serializerRef.get().deserialize(objectData); - } - - private static class KryoSerializerInstance implements Serializable { - public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576; - private final Kryo kryo; - // Caching ByteArrayOutputStream to avoid recreating it for every operation - private final ByteArrayOutputStream baos; - - KryoSerializerInstance() { - EmptyScalaKryoInstantiator kryoInstantiator = new EmptyScalaKryoInstantiator(); - kryo = kryoInstantiator.newKryo(); - baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE); - kryo.setRegistrationRequired(false); - } - - byte[] serialize(Object obj) throws IOException { - kryo.reset(); - baos.reset(); - Output output = new Output(baos); - this.kryo.writeClassAndObject(output, obj); - output.close(); - return baos.toByteArray(); - } - - Object deserialize(byte[] objectData) { - return this.kryo.readClassAndObject(new Input(objectData)); - } + return deserialize(new ByteArrayInputStream(objectData)); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java index 9fd0091fa..047b99796 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -156,8 +156,8 @@ public final class DiskBasedMap return null; } try { - return SerializationUtils.deserialize(SpillableMapUtils - .readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue())); + return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, + entry.getOffsetOfValue(), entry.getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java index 08aa78494..c2d74e0bd 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java @@ -81,9 +81,8 @@ public class LazyFileIterable implements Iterable { public R next() { Map.Entry entry = this.metadataIterator.next(); try { - return SerializationUtils.deserialize(SpillableMapUtils - .readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(), - entry.getValue().getSizeOfValue())); + return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle, + entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue())); } catch (IOException e) { throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java deleted file mode 100644 index a458f43da..000000000 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.common.util; - -import java.io.IOException; -import java.util.Arrays; -import java.util.LinkedList; -import org.apache.avro.util.Utf8; -import org.junit.Assert; -import org.junit.Test; - -public class TestSerializationUtils { - - @Test - public void testSerDeser() throws IOException { - // It should handle null object references. - verifyObject(null); - // Object with nulls. - verifyObject(new NonSerializableClass(null)); - // Object with valid values & no default constructor. - verifyObject(new NonSerializableClass("testValue")); - // Object which is of non-serializable class. - verifyObject(new Utf8("test-key")); - // Verify serialization of list. - verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5))); - } - - private void verifyObject(T expectedValue) throws IOException { - byte[] serializedObject = SerializationUtils.serialize(expectedValue); - Assert.assertTrue(serializedObject != null && serializedObject.length > 0); - - final T deserializedValue = SerializationUtils.deserialize(serializedObject); - if (expectedValue == null) { - Assert.assertNull(deserializedValue); - } else { - Assert.assertTrue(expectedValue.equals(deserializedValue)); - } - } - - private static class NonSerializableClass { - private String id; - - NonSerializableClass(String id) { - this.id = id; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof NonSerializableClass)) { - return false; - } - return id == null ? ((NonSerializableClass) obj).id == null - : id.equals(((NonSerializableClass) obj).id); - } - } -} diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index 7dffa25d0..c95950a4d 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -81,8 +81,6 @@ org.apache.hive:hive-service org.apache.hive:hive-metastore org.apache.hive:hive-jdbc - com.twitter:chill_2.11 - com.twitter:chill-java @@ -122,10 +120,6 @@ org.apache.hadoop.hive.service. com.uber.hoodie.org.apache.hadoop_hive.service. - - com.twitter.chill. - com.uber.hoodie.com.twitter.chill. - diff --git a/packaging/hoodie-hadoop-mr-bundle/pom.xml b/packaging/hoodie-hadoop-mr-bundle/pom.xml index 079236379..4eeb8a2ea 100644 --- a/packaging/hoodie-hadoop-mr-bundle/pom.xml +++ b/packaging/hoodie-hadoop-mr-bundle/pom.xml @@ -195,10 +195,6 @@ org.apache.commons com.uber.hoodie.org.apache.commons - - com.twitter.chill. - com.uber.hoodie.com.twitter.chill. - false @@ -209,9 +205,6 @@ com.twitter:parquet-hadoop-bundle com.twitter.common:objectsize commons-logging:commons-logging - com.twitter:chill_2.11 - com.twitter:chill-java - com.esotericsoftware:kryo-shaded ${project.artifactId}-${project.version} diff --git a/packaging/hoodie-hive-bundle/pom.xml b/packaging/hoodie-hive-bundle/pom.xml index aa9bd9f34..61462360f 100644 --- a/packaging/hoodie-hive-bundle/pom.xml +++ b/packaging/hoodie-hive-bundle/pom.xml @@ -199,10 +199,6 @@ parquet.schema. com.uber.hoodie.parquet.schema. - - com.twitter.chill. - com.uber.hoodie.com.twitter.chill. - false diff --git a/packaging/hoodie-presto-bundle/pom.xml b/packaging/hoodie-presto-bundle/pom.xml index d90b0f0f1..750fba9a8 100644 --- a/packaging/hoodie-presto-bundle/pom.xml +++ b/packaging/hoodie-presto-bundle/pom.xml @@ -161,10 +161,6 @@ parquet.schema. com.uber.hoodie.parquet.schema. - - com.twitter.chill. - com.uber.hoodie.com.twitter.chill. - false diff --git a/packaging/hoodie-spark-bundle/pom.xml b/packaging/hoodie-spark-bundle/pom.xml index 2205334a4..f3b45f498 100644 --- a/packaging/hoodie-spark-bundle/pom.xml +++ b/packaging/hoodie-spark-bundle/pom.xml @@ -148,10 +148,6 @@ org.apache.hadoop.hive.service. com.uber.hoodie.org.apache.hadoop_hive.service. - - com.twitter.chill. - com.uber.hoodie.com.twitter.chill. - false