diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index f378f4413..ef3d4f1c8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -147,4 +147,12 @@ public abstract class AbstractRealtimeRecordReader { public Schema getHiveSchema() { return hiveSchema; } + + public RealtimeSplit getSplit() { + return split; + } + + public JobConf getJobConf() { + return jobConf; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java index 615d95ee5..914e698fa 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; @@ -71,6 +72,9 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader 0) { this.currentRecordReader.close(); this.currentRecordReader = recordReaders.remove(0); + AbstractRealtimeRecordReader reader = (AbstractRealtimeRecordReader)currentRecordReader.getReader(); + // when switch reader, ioctx should be updated + IOContextMap.get(reader.getJobConf()).setInputPath(reader.getSplit().getPath()); return next(key, value); } else { return false; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java index 1e3a25ac7..1cd18cf97 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -103,4 +103,8 @@ public class HoodieRealtimeRecordReader implements RecordReader getReader() { + return this.reader; + } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java index 5ec32d715..50c3f2e1c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/functional/TestHoodieCombineHiveInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hudi.hadoop.functional; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -27,6 +28,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat; +import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit; +import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; @@ -45,6 +48,7 @@ import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.FileSplit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -55,6 +59,8 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK; import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS; @@ -87,6 +93,87 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); } + @Test + public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception { + // test for HUDI-1718 + Configuration conf = new Configuration(); + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + final int numRecords = 1000; + // Create 3 partitions, each partition holds one parquet file and 1000 records + List partitionDirs = InputFormatTestUtil + .prepareMultiPartitionedParquetTable(tempDir, schema, 3, numRecords, commitTime); + InputFormatTestUtil.commit(tempDir, commitTime); + + TableDesc tblDesc = Utilities.defaultTd; + // Set the input format + tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class); + LinkedHashMap pt = new LinkedHashMap<>(); + LinkedHashMap> talias = new LinkedHashMap<>(); + + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + + pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc); + + ArrayList arrayList = new ArrayList<>(); + arrayList.add(tempDir.toAbsolutePath().toString()); + talias.put(new Path(tempDir.toAbsolutePath().toString()), arrayList); + + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + mrwork.getMapWork().setPathToAliases(talias); + + Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); + Utilities.setMapRedWork(conf, mrwork, mapWorkPath); + jobConf = new JobConf(conf); + // Add three partition path to InputPaths + Path[] partitionDirArray = new Path[partitionDirs.size()]; + partitionDirs.stream().map(p -> new Path(p.getPath())).collect(Collectors.toList()).toArray(partitionDirArray); + FileInputFormat.setInputPaths(jobConf, partitionDirArray); + jobConf.set(HAS_MAP_WORK, "true"); + // The following config tells Hive to choose ExecMapper to read the MAP_WORK + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + // setting the split size to be 3 to create one split for 3 file groups + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000"); + + HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat(); + String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double"; + InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes); + + InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1); + // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups + assertEquals(1, splits.length); + + RecordReader recordReader = + combineHiveInputFormat.getRecordReader(splits[0], jobConf, null); + NullWritable nullWritable = recordReader.createKey(); + ArrayWritable arrayWritable = recordReader.createValue(); + int counter = 0; + + HoodieCombineRealtimeHiveSplit hiveSplit = (HoodieCombineRealtimeHiveSplit)splits[0]; + HoodieCombineRealtimeFileSplit fileSplit = (HoodieCombineRealtimeFileSplit)hiveSplit.getInputSplitShim(); + List realtimeFileSplits = fileSplit.getRealtimeFileSplits(); + + while (recordReader.next(nullWritable, arrayWritable)) { + // since each file holds 1000 records, when counter % 1000 == 0, + // HoodieCombineRealtimeRecordReader will switch reader internal + // Hive use ioctx to extract partition info, when switch reader, ioctx should be updated. + if (counter < 1000) { + assertEquals(IOContextMap.get(jobConf).getInputPath().toString(), realtimeFileSplits.get(0).getPath().toString()); + } else if (counter < 2000) { + assertEquals(IOContextMap.get(jobConf).getInputPath().toString(), realtimeFileSplits.get(1).getPath().toString()); + } else { + assertEquals(IOContextMap.get(jobConf).getInputPath().toString(), realtimeFileSplits.get(2).getPath().toString()); + } + counter++; + } + // should read out 3 splits, each for file0, file1, file2 containing 1000 records each + assertEquals(3000, counter); + recordReader.close(); + } + @Test public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception { // test for HUDI-1718 @@ -154,6 +241,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { } // should read out 3 splits, each for file0, file1, file2 containing 1000 records each assertEquals(3000, counter); + recordReader.close(); } @Test diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index f49ce0624..d10ccfca9 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -203,6 +203,18 @@ public class InputFormatTestUtil { return basePath.toFile(); } + public static List prepareMultiPartitionedParquetTable(java.nio.file.Path basePath, Schema schema, + int numberPartitions, int numberOfRecordsPerPartition, String commitNumber) throws IOException { + List result = new ArrayList<>(); + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString()); + for (int i = 0; i < numberPartitions; i++) { + java.nio.file.Path partitionPath = basePath.resolve(Paths.get(2016 + i + "", "05", "01")); + createData(schema, partitionPath, 1, numberOfRecordsPerPartition, commitNumber); + result.add(partitionPath.toFile()); + } + return result; + } + private static void createData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException { AvroParquetWriter parquetWriter;