1
0

[HUDI-159]: Pom cleanup and removal of com.twitter.parquet

- Redo all classes based on org.parquet only
 - remove unuused dependencies like parquet-hadoop, common-configuration2
 - timeline-service does not build a fat jar anymore
 - Fix utilities and hadoop-mr bundles based on above
This commit is contained in:
vinoth chandar
2019-08-25 05:34:51 -07:00
committed by Balaji Varadarajan
parent 6edf0b9def
commit cd090871a1
29 changed files with 600 additions and 326 deletions

View File

@@ -166,6 +166,11 @@
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
@@ -188,12 +193,6 @@
<version>${spring.shell.version}</version>
</dependency>
<dependency>
<groupId>de.vandermeer</groupId>
<artifactId>asciitable</artifactId>
<version>0.2.5</version>
</dependency>
<dependency>
<groupId>com.jakewharton.fliptables</groupId>
<artifactId>fliptables</artifactId>
@@ -214,12 +213,5 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit-dep</artifactId>
<version>${junit-dep.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.cli.commands;
import com.beust.jcommander.internal.Maps;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,12 +50,12 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.hive.util.SchemaUtil;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import parquet.avro.AvroSchemaConverter;
import scala.Tuple2;
import scala.Tuple3;

View File

@@ -91,10 +91,6 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<!-- Spark -->
<dependency>
@@ -110,6 +106,12 @@
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
<exclusions>
<exclusion>
<groupId>com.rabbitmq</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
@@ -125,12 +127,7 @@
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.48</version>
</dependency>
<dependency>
<groupId>org.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.0.4</version>
<scope>test</scope>
</dependency>
<!-- Hadoop -->

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.io;
import com.beust.jcommander.internal.Maps;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

View File

@@ -21,8 +21,8 @@ package org.apache.hudi.io.strategy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.beust.jcommander.internal.Lists;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -248,7 +248,7 @@ public class TestHoodieCompactionStrategy {
private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
Map<Long, List<Long>> sizesMap, Map<Long, String> keyToPartitionMap) {
List<HoodieCompactionOperation> operations = Lists.newArrayList(sizesMap.size());
List<HoodieCompactionOperation> operations = new ArrayList<>(sizesMap.size());
sizesMap.forEach((k, v) -> {
HoodieDataFile df = TestHoodieDataFile.newDataFile(k);

View File

@@ -111,13 +111,6 @@
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Twitter -->
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>objectsize</artifactId>
<version>0.0.12</version>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>commons-codec</groupId>
@@ -154,7 +147,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
@@ -173,11 +171,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.common.util;
import com.twitter.common.objectsize.ObjectSizeCalculator;
/**
* Default implementation of size-estimator that uses Twitter's ObjectSizeCalculator
* @param <T>

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.common.util;
import com.twitter.common.objectsize.ObjectSizeCalculator;
import org.apache.avro.Schema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;

View File

@@ -0,0 +1,478 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// COPIED FROM https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/objectsize/
// ObjectSizeCalculator.java
// =================================================================================================
// Copyright 2011 Twitter, Inc.
// -------------------------------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this work except in compliance with the License.
// You may obtain a copy of the License in the LICENSE file, or at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =================================================================================================
package org.apache.hudi.common.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
/**
* Contains utility methods for calculating the memory usage of objects. It
* only works on the HotSpot JVM, and infers the actual memory layout (32 bit
* vs. 64 bit word size, compressed object pointers vs. uncompressed) from
* best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM.
* It can only make an educated guess at whether compressed OOPs are used,
* though; specifically, it knows what the JVM's default choice of OOP
* compression would be based on HotSpot version and maximum heap sizes, but if
* the choice is explicitly overridden with the <tt>-XX:{+|-}UseCompressedOops</tt> command line
* switch, it can not detect
* this fact and will report incorrect sizes, as it will presume the default JVM
* behavior.
*
* @author Attila Szegedi
*/
public class ObjectSizeCalculator {
/**
* Describes constant memory overheads for various constructs in a JVM implementation.
*/
public interface MemoryLayoutSpecification {
/**
* Returns the fixed overhead of an array of any type or length in this JVM.
*
* @return the fixed overhead of an array.
*/
int getArrayHeaderSize();
/**
* Returns the fixed overhead of for any {@link Object} subclass in this JVM.
*
* @return the fixed overhead of any object.
*/
int getObjectHeaderSize();
/**
* Returns the quantum field size for a field owned by an object in this JVM.
*
* @return the quantum field size for an object.
*/
int getObjectPadding();
/**
* Returns the fixed size of an object reference in this JVM.
*
* @return the size of all object references.
*/
int getReferenceSize();
/**
* Returns the quantum field size for a field owned by one of an object's ancestor superclasses
* in this JVM.
*
* @return the quantum field size for a superclass field.
*/
int getSuperclassFieldPadding();
}
private static class CurrentLayout {
private static final MemoryLayoutSpecification SPEC =
getEffectiveMemoryLayoutSpecification();
}
/**
* Given an object, returns the total allocated size, in bytes, of the object
* and all other objects reachable from it. Attempts to to detect the current JVM memory layout,
* but may fail with {@link UnsupportedOperationException};
*
* @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
* anything special, it measures the size of all objects
* reachable through it (which will include its class loader, and by
* extension, all other Class objects loaded by
* the same loader, and all the parent class loaders). It doesn't provide the
* size of the static fields in the JVM class that the Class object
* represents.
* @return the total allocated size of the object and all other objects it
* retains.
* @throws UnsupportedOperationException if the current vm memory layout cannot be detected.
*/
public static long getObjectSize(Object obj) throws UnsupportedOperationException {
return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj);
}
// Fixed object header size for arrays.
private final int arrayHeaderSize;
// Fixed object header size for non-array objects.
private final int objectHeaderSize;
// Padding for the object size - if the object size is not an exact multiple
// of this, it is padded to the next multiple.
private final int objectPadding;
// Size of reference (pointer) fields.
private final int referenceSize;
// Padding for the fields of superclass before fields of subclasses are
// added.
private final int superclassFieldPadding;
private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos =
CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() {
public ClassSizeInfo load(Class<?> clazz) {
return new ClassSizeInfo(clazz);
}
});
private final Set<Object> alreadyVisited = Sets.newIdentityHashSet();
private final Deque<Object> pending = new ArrayDeque<Object>(16 * 1024);
private long size;
/**
* Creates an object size calculator that can calculate object sizes for a given
* {@code memoryLayoutSpecification}.
*
* @param memoryLayoutSpecification a description of the JVM memory layout.
*/
public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) {
Preconditions.checkNotNull(memoryLayoutSpecification);
arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize();
objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize();
objectPadding = memoryLayoutSpecification.getObjectPadding();
referenceSize = memoryLayoutSpecification.getReferenceSize();
superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding();
}
/**
* Given an object, returns the total allocated size, in bytes, of the object
* and all other objects reachable from it.
*
* @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do
* anything special, it measures the size of all objects
* reachable through it (which will include its class loader, and by
* extension, all other Class objects loaded by
* the same loader, and all the parent class loaders). It doesn't provide the
* size of the static fields in the JVM class that the Class object
* represents.
* @return the total allocated size of the object and all other objects it
* retains.
*/
public synchronized long calculateObjectSize(Object obj) {
// Breadth-first traversal instead of naive depth-first with recursive
// implementation, so we don't blow the stack traversing long linked lists.
try {
for (; ; ) {
visit(obj);
if (pending.isEmpty()) {
return size;
}
obj = pending.removeFirst();
}
} finally {
alreadyVisited.clear();
pending.clear();
size = 0;
}
}
private void visit(Object obj) {
if (alreadyVisited.contains(obj)) {
return;
}
final Class<?> clazz = obj.getClass();
if (clazz == ArrayElementsVisitor.class) {
((ArrayElementsVisitor) obj).visit(this);
} else {
alreadyVisited.add(obj);
if (clazz.isArray()) {
visitArray(obj);
} else {
classSizeInfos.getUnchecked(clazz).visit(obj, this);
}
}
}
private void visitArray(Object array) {
final Class<?> componentType = array.getClass().getComponentType();
final int length = Array.getLength(array);
if (componentType.isPrimitive()) {
increaseByArraySize(length, getPrimitiveFieldSize(componentType));
} else {
increaseByArraySize(length, referenceSize);
// If we didn't use an ArrayElementsVisitor, we would be enqueueing every
// element of the array here instead. For large arrays, it would
// tremendously enlarge the queue. In essence, we're compressing it into
// a small command object instead. This is different than immediately
// visiting the elements, as their visiting is scheduled for the end of
// the current queue.
switch (length) {
case 0: {
break;
}
case 1: {
enqueue(Array.get(array, 0));
break;
}
default: {
enqueue(new ArrayElementsVisitor((Object[]) array));
}
}
}
}
private void increaseByArraySize(int length, long elementSize) {
increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding));
}
private static class ArrayElementsVisitor {
private final Object[] array;
ArrayElementsVisitor(Object[] array) {
this.array = array;
}
public void visit(ObjectSizeCalculator calc) {
for (Object elem : array) {
if (elem != null) {
calc.visit(elem);
}
}
}
}
void enqueue(Object obj) {
if (obj != null) {
pending.addLast(obj);
}
}
void increaseSize(long objectSize) {
size += objectSize;
}
@VisibleForTesting
static long roundTo(long x, int multiple) {
return ((x + multiple - 1) / multiple) * multiple;
}
private class ClassSizeInfo {
// Padded fields + header size
private final long objectSize;
// Only the fields size - used to calculate the subclasses' memory
// footprint.
private final long fieldsSize;
private final Field[] referenceFields;
public ClassSizeInfo(Class<?> clazz) {
long fieldsSize = 0;
final List<Field> referenceFields = new LinkedList<Field>();
for (Field f : clazz.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers())) {
continue;
}
final Class<?> type = f.getType();
if (type.isPrimitive()) {
fieldsSize += getPrimitiveFieldSize(type);
} else {
f.setAccessible(true);
referenceFields.add(f);
fieldsSize += referenceSize;
}
}
final Class<?> superClass = clazz.getSuperclass();
if (superClass != null) {
final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass);
fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding);
referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields));
}
this.fieldsSize = fieldsSize;
this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding);
this.referenceFields = referenceFields.toArray(
new Field[referenceFields.size()]);
}
void visit(Object obj, ObjectSizeCalculator calc) {
calc.increaseSize(objectSize);
enqueueReferencedObjects(obj, calc);
}
public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) {
for (Field f : referenceFields) {
try {
calc.enqueue(f.get(obj));
} catch (IllegalAccessException e) {
final AssertionError ae = new AssertionError(
"Unexpected denial of access to " + f);
ae.initCause(e);
throw ae;
}
}
}
}
private static long getPrimitiveFieldSize(Class<?> type) {
if (type == boolean.class || type == byte.class) {
return 1;
}
if (type == char.class || type == short.class) {
return 2;
}
if (type == int.class || type == float.class) {
return 4;
}
if (type == long.class || type == double.class) {
return 8;
}
throw new AssertionError("Encountered unexpected primitive type "
+ type.getName());
}
@VisibleForTesting
static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
final String vmName = System.getProperty("java.vm.name");
if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ")
|| vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) {
throw new UnsupportedOperationException(
"ObjectSizeCalculator only supported on HotSpot VM");
}
final String dataModel = System.getProperty("sun.arch.data.model");
if ("32".equals(dataModel)) {
// Running with 32-bit data model
return new MemoryLayoutSpecification() {
@Override
public int getArrayHeaderSize() {
return 12;
}
@Override
public int getObjectHeaderSize() {
return 8;
}
@Override
public int getObjectPadding() {
return 8;
}
@Override
public int getReferenceSize() {
return 4;
}
@Override
public int getSuperclassFieldPadding() {
return 4;
}
};
} else if (!"64".equals(dataModel)) {
throw new UnsupportedOperationException("Unrecognized value '"
+ dataModel + "' of sun.arch.data.model system property");
}
final String strVmVersion = System.getProperty("java.vm.version");
final int vmVersion = Integer.parseInt(strVmVersion.substring(0,
strVmVersion.indexOf('.')));
if (vmVersion >= 17) {
long maxMemory = 0;
for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
maxMemory += mp.getUsage().getMax();
}
if (maxMemory < 30L * 1024 * 1024 * 1024) {
// HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total
// for all memory pools (yes, including code cache).
return new MemoryLayoutSpecification() {
@Override
public int getArrayHeaderSize() {
return 16;
}
@Override
public int getObjectHeaderSize() {
return 12;
}
@Override
public int getObjectPadding() {
return 8;
}
@Override
public int getReferenceSize() {
return 4;
}
@Override
public int getSuperclassFieldPadding() {
return 4;
}
};
}
}
// In other cases, it's a 64-bit uncompressed OOPs object model
return new MemoryLayoutSpecification() {
@Override
public int getArrayHeaderSize() {
return 24;
}
@Override
public int getObjectHeaderSize() {
return 16;
}
@Override
public int getObjectPadding() {
return 8;
}
@Override
public int getReferenceSize() {
return 8;
}
@Override
public int getSuperclassFieldPadding() {
return 8;
}
};
}
}

View File

@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
@@ -71,7 +70,7 @@ public class ParquetUtils {
*/
public static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
Option<RecordKeysFilterFunction> filterFunction = Option.empty();
if (CollectionUtils.isNotEmpty(filter)) {
if (filter != null && !filter.isEmpty()) {
filterFunction = Option.of(new RecordKeysFilterFunction(filter));
}
Configuration conf = new Configuration(configuration);

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.common.util.collection;
import com.twitter.common.objectsize.ObjectSizeCalculator;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -30,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;

View File

@@ -49,28 +49,12 @@
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Parquet (Twitter) -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>objectsize</artifactId>
<version>0.0.12</version>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -113,12 +97,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@@ -53,9 +53,9 @@ import org.apache.hudi.common.util.LogReaderUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.schema.MessageType;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real

View File

@@ -46,17 +46,12 @@
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<!-- Parquet (Twitter) -->
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
@@ -65,12 +60,6 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
@@ -95,16 +84,6 @@
<artifactId>jcommander</artifactId>
</dependency>
<!-- Httpcomponents -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -175,15 +154,15 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21</version>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>

View File

@@ -36,9 +36,9 @@ import org.apache.hudi.hadoop.realtime.HoodieRealtimeInputFormat;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.schema.MessageType;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
/**
@@ -52,7 +52,7 @@ import parquet.schema.MessageType;
@SuppressWarnings("WeakerAccess")
public class HiveSyncTool {
private static final Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class);
private static Logger LOG = LogManager.getLogger(HiveSyncTool.class);
private final HoodieHiveClient hoodieHiveClient;
public static final String SUFFIX_REALTIME_TABLE = "_rt";
private final HiveSyncConfig cfg;

View File

@@ -57,13 +57,13 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidDatasetException;
import org.apache.hudi.hive.util.SchemaUtil;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType;
@SuppressWarnings("ConstantConditions")
public class HoodieHiveClient {

View File

@@ -25,7 +25,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import parquet.schema.MessageType;
import org.apache.parquet.schema.MessageType;
/**
* Represents the schema difference between the storage schema and hive table schema

View File

@@ -36,14 +36,15 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import parquet.schema.DecimalMetadata;
import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;
import parquet.schema.PrimitiveType;
import parquet.schema.Type;
/**
* Schema Utilities
@@ -439,6 +440,7 @@ public class SchemaUtil {
/**
* Read the schema from the log file on path
* @return
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
@@ -452,7 +454,7 @@ public class SchemaUtil {
}
reader.close();
if (lastBlock != null) {
return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema());
return new AvroSchemaConverter().convert(lastBlock.getSchema());
}
return null;
}

View File

@@ -32,12 +32,13 @@ import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.util.SchemaUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;
import parquet.schema.PrimitiveType;
@SuppressWarnings("ConstantConditions")
public class HiveSyncToolTest {
@@ -59,8 +60,8 @@ public class HiveSyncToolTest {
@Test
public void testSchemaConvertArray() throws IOException {
// Testing the 3-level annotation structure
MessageType schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST).repeatedGroup()
MessageType schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST).repeatedGroup()
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element")
.named("list").named("int_list").named("ArrayOfInts");
@@ -68,8 +69,8 @@ public class HiveSyncToolTest {
assertEquals("`int_list` ARRAY< int>", schemaString);
// A array of arrays
schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST).repeatedGroup().requiredGroup()
schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST).repeatedGroup().requiredGroup()
.as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list")
.named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
@@ -78,8 +79,8 @@ public class HiveSyncToolTest {
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
// A list of integers
schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST)
schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST)
.repeated(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("int_list")
.named("ArrayOfInts");
@@ -87,8 +88,8 @@ public class HiveSyncToolTest {
assertEquals("`int_list` ARRAY< int>", schemaString);
// A list of structs with two fields
schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST).repeatedGroup()
schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
.required(PrimitiveType.PrimitiveTypeName.INT32).named("num").named("element")
.named("tuple_list").named("ArrayOfTuples");
@@ -99,8 +100,8 @@ public class HiveSyncToolTest {
// A list of structs with a single field
// For this case, since the inner group name is "array", we treat the
// element type as a one-element struct.
schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST).repeatedGroup()
schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array")
.named("one_tuple_list").named("ArrayOfOneTuples");
@@ -110,8 +111,8 @@ public class HiveSyncToolTest {
// A list of structs with a single field
// For this case, since the inner group name ends with "_tuple", we also treat the
// element type as a one-element struct.
schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST).repeatedGroup()
schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
.named("one_tuple_list_tuple").named("one_tuple_list").named("ArrayOfOneTuples2");
@@ -121,8 +122,8 @@ public class HiveSyncToolTest {
// A list of structs with a single field
// Unlike the above two cases, for this the element type is the type of the
// only field in the struct.
schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST).repeatedGroup()
schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str")
.named("one_tuple_list").named("one_tuple_list").named("ArrayOfOneTuples3");
@@ -130,8 +131,8 @@ public class HiveSyncToolTest {
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
// A list of maps
schema = parquet.schema.Types.buildMessage().optionalGroup()
.as(parquet.schema.OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
schema = Types.buildMessage().optionalGroup()
.as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
.repeatedGroup().as(OriginalType.MAP_KEY_VALUE)
.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
.named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32)

View File

@@ -41,6 +41,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
import org.apache.hive.service.server.HiveServer2;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
@@ -52,12 +54,10 @@ import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveTestService {
private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class);
private static Logger LOG = LogManager.getLogger(HiveTestService.class);
private static final int CONNECTION_TIMEOUT = 30000;

View File

@@ -203,6 +203,12 @@
<artifactId>avro</artifactId>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
@@ -218,12 +224,7 @@
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<scope>provided</scope>
</dependency>
<!-- Hadoop -->
@@ -293,8 +294,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit-dep</artifactId>
<version>${junit-dep.version}</version>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

View File

@@ -55,59 +55,6 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>true</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.hudi.timeline.service.TimelineService</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
@@ -244,11 +191,5 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -76,18 +76,20 @@
</repositories>
<dependencies>
<!-- Jetty -->
<dependency>
<!-- Needs to be at the top to ensure we get the correct dependency versions for jetty-server -->
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<version>7.6.0.v20120127</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>7.6.0.v20120127</version>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<!-- Hoodie -->
@@ -129,10 +131,6 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- Fasterxml -->
<dependency>
@@ -151,10 +149,6 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<!-- Spark -->
<dependency>
@@ -167,6 +161,7 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
@@ -177,6 +172,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
@@ -194,24 +196,6 @@
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<!-- Used for SQL templating -->
<dependency>
<groupId>org.antlr</groupId>
@@ -252,24 +236,6 @@
<version>3.0.0</version>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<!-- Httpcomponents -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
@@ -295,11 +261,13 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
@@ -330,6 +298,10 @@
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@@ -25,6 +25,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-hadoop-mr-bundle</artifactId>
<packaging>jar</packaging>
<dependencies>
<!-- Hoodie -->
@@ -58,16 +59,6 @@
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Parquet (Twitter) -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</dependency>
<!-- Apache Commons -->
<dependency>
<groupId>commons-logging</groupId>
@@ -82,12 +73,6 @@
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>objectsize</artifactId>
<version>0.0.12</version>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -239,15 +224,16 @@
<includes>
<include>org.apache.hudi:hudi-common</include>
<include>org.apache.hudi:hudi-hadoop-mr</include>
<include>com.twitter:parquet-avro</include>
<include>com.twitter:parquet-hadoop-bundle</include>
<include>com.twitter.common:objectsize</include>
<include>commons-logging:commons-logging</include>
<include>commons-io:commons-io</include>
<include>commons-lang:commons-lang</include>
<include>commons-pool:commons-pool</include>
<include>commons-codec:commons-codec</include>
<include>com.esotericsoftware:kryo-shaded</include>
<include>org.objenesis:objenesis</include>
<include>com.esotericsoftware:minlog</include>
<include>commons-codec:commons-codec</include>
<include>org.apache.parquet:parquet-avro</include>
</includes>
</artifactSet>
<filters>

View File

@@ -62,12 +62,6 @@
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- Parquet (Twitter) -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Thrift -->
<dependency>
<groupId>org.apache.thrift</groupId>

View File

@@ -50,12 +50,6 @@
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- Parquet (Twitter) -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Thrift -->
<dependency>
<groupId>org.apache.thrift</groupId>

View File

@@ -296,10 +296,6 @@
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
</dependency>
<!-- Hadoop -->
<dependency>

View File

@@ -75,11 +75,22 @@
<include>org.apache.hudi:hudi-hadoop-mr</include>
<include>org.apache.hudi:hudi-timeline-service</include>
<include>com.beust:jcommander</include>
<include>io.javalin:javalin</include>
<include>org.jetbrains.kotlin:*</include>
<include>org.eclipse.jetty:*</include>
<include>org.eclipse.jetty.websocket:*</include>
<include>org.rocksdb:rocksdbjni</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>org.apache.httpcomponents:fluent-hc</include>
<include>org.antlr:stringtemplate</include>
<include>commons-io:commons-io</include>
<include>commons-logging:commons-logging</include>
<include>org.apache.parquet:parquet-avro</include>
<include>com.twitter:bijection-avro_2.11</include>
<include>com.twitter:bijection-core_2.11</include>
<include>org.apache.parquet:parquet-avro</include>
<include>com.twitter:parquet-avro</include>
<include>com.twitter.common:objectsize</include>
<include>io.confluent:kafka-avro-serializer</include>
<include>io.confluent:common-config</include>
<include>io.confluent:common-utils</include>
@@ -263,10 +274,6 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<!-- Spark -->
<dependency>

36
pom.xml
View File

@@ -147,6 +147,7 @@
<scalatest.version>3.0.1</scalatest.version>
<surefire-log4j.file>file://${project.basedir}/src/test/resources/log4j-surefire.properties</surefire-log4j.file>
<thrift.version>0.12.0</thrift.version>
<jetty.version>7.6.0.v20120127</jetty.version>
<hbase.version>1.2.3</hbase.version>
<codehaus-jackson.version>1.9.13</codehaus-jackson.version>
<notice.dir>${project.basedir}</notice.dir>
@@ -524,7 +525,7 @@
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<scope>test</scope>
<version>7.6.0.v20120127</version>
<version>${jetty.version}</version>
</dependency>
<!-- Avro -->
@@ -545,34 +546,6 @@
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>${parquet.version}</version>
</dependency>
<!-- Parquet (Twitter) -->
<!-- Spark parquet version 1.7.0 does not play well with the hive 1.1.0 installed in cluster (which requires twitter parquet 1.5.0) -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.6.0</version>
</dependency>
<!-- Spark -->
<dependency>
@@ -675,11 +648,6 @@
<artifactId>commons-pool</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.1.1</version>
</dependency>
<!-- Httpcomponents -->
<dependency>