HUDI-515 Resolve API conflict for Hive 2 & Hive 3
This commit is contained in:
@@ -70,6 +70,7 @@ import org.apache.log4j.Logger;
|
|||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -137,7 +138,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
Set<Path> poolSet = new HashSet<>();
|
Set<Path> poolSet = new HashSet<>();
|
||||||
|
|
||||||
for (Path path : paths) {
|
for (Path path : paths) {
|
||||||
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, path,
|
PartitionDesc part = getPartitionFromPath(pathToPartitionInfo, path,
|
||||||
IOPrepareCache.get().allocatePartitionDescMap());
|
IOPrepareCache.get().allocatePartitionDescMap());
|
||||||
TableDesc tableDesc = part.getTableDesc();
|
TableDesc tableDesc = part.getTableDesc();
|
||||||
if ((tableDesc != null) && tableDesc.isNonNative()) {
|
if ((tableDesc != null) && tableDesc.isNonNative()) {
|
||||||
@@ -375,6 +376,34 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
iss.addAll(Arrays.asList(combine.getSplits(currJob, 1)));
|
iss.addAll(Arrays.asList(combine.getSplits(currJob, 1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HiveFileFormatUtils.getPartitionDescFromPathRecursively is no longer available since Hive 3.
|
||||||
|
* This method is to make it compatible with both Hive 2 and Hive 3.
|
||||||
|
* @param pathToPartitionInfo
|
||||||
|
* @param dir
|
||||||
|
* @param cacheMap
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private static PartitionDesc getPartitionFromPath(Map<Path, PartitionDesc> pathToPartitionInfo, Path dir,
|
||||||
|
Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> cacheMap)
|
||||||
|
throws IOException {
|
||||||
|
Method method;
|
||||||
|
try {
|
||||||
|
Class<?> hiveUtilsClass = Class.forName("org.apache.hadoop.hive.ql.io.HiveFileFormatUtils");
|
||||||
|
try {
|
||||||
|
// HiveFileFormatUtils.getPartitionDescFromPathRecursively method only available in Hive 2.x
|
||||||
|
method = hiveUtilsClass.getMethod("getPartitionDescFromPathRecursively", Map.class, Path.class, Map.class);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
// HiveFileFormatUtils.getFromPathRecursively method only available in Hive 3.x
|
||||||
|
method = hiveUtilsClass.getMethod("getFromPathRecursively", Map.class, Path.class, Map.class);
|
||||||
|
}
|
||||||
|
return (PartitionDesc) method.invoke(null, pathToPartitionInfo, dir, cacheMap);
|
||||||
|
} catch (ReflectiveOperationException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MOD - Just added this for visibility.
|
* MOD - Just added this for visibility.
|
||||||
*/
|
*/
|
||||||
@@ -568,8 +597,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
// CombinedSplit.
|
// CombinedSplit.
|
||||||
Path[] ipaths = inputSplitShim.getPaths();
|
Path[] ipaths = inputSplitShim.getPaths();
|
||||||
if (ipaths.length > 0) {
|
if (ipaths.length > 0) {
|
||||||
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(this.pathToPartitionInfo,
|
PartitionDesc part = getPartitionFromPath(this.pathToPartitionInfo, ipaths[0],
|
||||||
ipaths[0], IOPrepareCache.get().getPartitionDescMap());
|
IOPrepareCache.get().getPartitionDescMap());
|
||||||
inputFormatClassName = part.getInputFileFormatClass().getName();
|
inputFormatClassName = part.getInputFileFormatClass().getName();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -703,8 +732,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
|
|
||||||
// extract all the inputFormatClass names for each chunk in the
|
// extract all the inputFormatClass names for each chunk in the
|
||||||
// CombinedSplit.
|
// CombinedSplit.
|
||||||
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
|
PartitionDesc part = getPartitionFromPath(pathToPartitionInfo, inputSplitShim.getPath(0),
|
||||||
inputSplitShim.getPath(0), IOPrepareCache.get().getPartitionDescMap());
|
IOPrepareCache.get().getPartitionDescMap());
|
||||||
|
|
||||||
// create a new InputFormat instance if this is the first time to see
|
// create a new InputFormat instance if this is the first time to see
|
||||||
// this class
|
// this class
|
||||||
@@ -957,8 +986,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
public Set<Integer> call() throws Exception {
|
public Set<Integer> call() throws Exception {
|
||||||
Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
|
Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
|
||||||
for (int i = 0; i < length; i++) {
|
for (int i = 0; i < length; i++) {
|
||||||
PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo,
|
PartitionDesc part = getPartitionFromPath(pathToPartitionInfo, paths[i + start],
|
||||||
paths[i + start], IOPrepareCache.get().allocatePartitionDescMap());
|
IOPrepareCache.get().allocatePartitionDescMap());
|
||||||
// Use HiveInputFormat if any of the paths is not splittable
|
// Use HiveInputFormat if any of the paths is not splittable
|
||||||
Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
|
Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
|
||||||
InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, conf);
|
InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, conf);
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
|
|||||||
import org.apache.hadoop.hive.metastore.HiveMetaStore;
|
import org.apache.hadoop.hive.metastore.HiveMetaStore;
|
||||||
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
||||||
import org.apache.hadoop.hive.metastore.IHMSHandler;
|
import org.apache.hadoop.hive.metastore.IHMSHandler;
|
||||||
|
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
|
||||||
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
|
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
|
||||||
import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
|
import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
|
||||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||||
@@ -286,7 +287,8 @@ public class HiveTestService {
|
|||||||
TProcessor processor;
|
TProcessor processor;
|
||||||
TTransportFactory transFactory;
|
TTransportFactory transFactory;
|
||||||
|
|
||||||
IHMSHandler handler = (IHMSHandler) HiveMetaStore.newRetryingHMSHandler("new db based metaserver", conf, true);
|
HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, false);
|
||||||
|
IHMSHandler handler = RetryingHMSHandler.getProxy(conf, baseHandler, true);
|
||||||
|
|
||||||
if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) {
|
if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) {
|
||||||
transFactory = useFramedTransport
|
transFactory = useFramedTransport
|
||||||
|
|||||||
Reference in New Issue
Block a user