diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index 356ae96da..7173c9a8f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -170,7 +170,7 @@ public class HoodieCombineHiveInputFormat partitions = new ArrayList<>(part.getPartSpec().keySet()); if (!partitions.isEmpty()) { - String partitionStr = String.join(",", partitions); + String partitionStr = String.join("/", partitions); LOG.info("Setting Partitions in jobConf - Partition Keys for Path : " + path + " is :" + partitionStr); job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, partitionStr); } else { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java index 5150d6abc..a5b193126 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.functional; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -26,6 +27,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.avro.Schema; @@ -51,6 +53,7 @@ import org.junit.jupiter.api.Test; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedHashMap; import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK; @@ -84,6 +87,75 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); } + @Test + public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception { + // test for HUDI-1718 + Configuration conf = new Configuration(); + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + final int numRecords = 1000; + // Create 3 parquet files with 1000 records each + File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime); + InputFormatTestUtil.commit(tempDir, commitTime); + + TableDesc tblDesc = Utilities.defaultTd; + // Set the input format + tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class); + LinkedHashMap pt = new LinkedHashMap<>(); + LinkedHashMap> talias = new LinkedHashMap<>(); + LinkedHashMap partSpec = new LinkedHashMap<>(); + // add three level partitions info + partSpec.put("year", "2016"); + partSpec.put("month", "05"); + partSpec.put("day", "01"); + PartitionDesc partDesc = new PartitionDesc(tblDesc, partSpec); + + pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc); + + ArrayList arrayList = new ArrayList<>(); + arrayList.add(tempDir.toAbsolutePath().toString()); + talias.put(new Path(tempDir.toAbsolutePath().toString()), arrayList); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + mrwork.getMapWork().setPathToAliases(talias); + + Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); + Utilities.setMapRedWork(conf, mrwork, mapWorkPath); + jobConf = new JobConf(conf); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + jobConf.set(HAS_MAP_WORK, "true"); + // The following config tells Hive to choose ExecMapper to read the MAP_WORK + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + // setting the split size to be 3 to create one split for 3 file groups + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000"); + + HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); + String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; + InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + // unset META_TABLE_PARTITION_COLUMNS to trigger HUDI-1718 + jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); + InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1); + // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups + assertEquals(1, splits.length); + + // if HUDI-1718 is not fixed, the follow code will throw exception + RecordReader recordReader = + combineHiveInputFormat.getRecordReader(splits[0], jobConf, null); + NullWritable nullWritable = recordReader.createKey(); + ArrayWritable arrayWritable = recordReader.createValue(); + int counter = 0; + while (recordReader.next(nullWritable, arrayWritable)) { + // read over all the splits + counter++; + } + // should read out 3 splits, each for file0, file1, file2 containing 1000 records each + assertEquals(3000, counter); + } + @Test @Disabled public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {