From 086853c004c6581e08744c5900210b6aaffd90e4 Mon Sep 17 00:00:00 2001 From: Satish Kotha Date: Tue, 7 Jul 2020 13:30:58 -0700 Subject: [PATCH] [HUDI-1080] Fix backward compatibility for com.uber inputformats --- .../hive/HoodieCombineHiveInputFormat.java | 60 +++++++++++++++++++ .../hive/HoodieCombineHiveInputFormat.java | 44 +++++++++----- 2 files changed, 90 insertions(+), 14 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java diff --git a/hudi-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java new file mode 100644 index 000000000..2220a10d0 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.hadoop.hive; + +import com.uber.hoodie.hadoop.HoodieInputFormat; +import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; + +public class HoodieCombineHiveInputFormat + extends org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat { + + @Override + protected String getParquetInputFormatClassName() { + return HoodieInputFormat.class.getName(); + } + + @Override + protected String getParquetRealtimeInputFormatClassName() { + return HoodieRealtimeInputFormat.class.getName(); + } + + @Override + protected org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim + createInputFormatShim() { + return new HoodieCombineFileInputFormatShim<>(); + } + + public static class HoodieCombineFileInputFormatShim + extends org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim { + + @Override + protected HoodieParquetInputFormat createParquetInputFormat() { + return new HoodieInputFormat(); + } + + @Override + protected HoodieParquetRealtimeInputFormat createParquetRealtimeInputFormat() { + return new HoodieRealtimeInputFormat(); + } + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index a88d1522b..356ae96da 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -20,7 +20,6 @@ package org.apache.hudi.hadoop.hive; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieCombineRealtimeRecordReader; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; @@ -109,6 +108,18 @@ public class HoodieCombineHiveInputFormat(); + } + /** * Create Hive splits based on CombineFileSplit. */ @@ -118,8 +129,7 @@ public class HoodieCombineHiveInputFormat> pathToAliases = mrwork.getPathToAliases(); Map> aliasToWork = mrwork.getAliasToWork(); /* MOD - Initialize a custom combine input format shim that will call listStatus on the custom inputFormat **/ - HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim combine = - new HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim<>(); + HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim combine = createInputFormatShim(); InputSplit[] splits; @@ -151,9 +161,9 @@ public class HoodieCombineHiveInputFormat " + inputFormatClass.getName()); // **MOD** Set the hoodie filter in the combine - if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) { + if (inputFormatClass.getName().equals(getParquetInputFormatClassName())) { combine.setHoodieFilter(true); - } else if (inputFormatClass.getName().equals(HoodieParquetRealtimeInputFormat.class.getName())) { + } else if (inputFormatClass.getName().equals(getParquetRealtimeInputFormatClassName())) { LOG.info("Setting hoodie filter and realtime input format"); combine.setHoodieFilter(true); combine.setRealTime(true); @@ -540,16 +550,14 @@ public class HoodieCombineHiveInputFormat(Arrays.asList(input.listStatus(new JobConf(job.getConfiguration())))); @@ -901,7 +917,7 @@ public class HoodieCombineHiveInputFormat