[HUDI-1718] When query incr view of mor table which has Multi level partitions, the query failed (#2716)
This commit is contained in:
@@ -170,7 +170,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
|
|||||||
if (job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").isEmpty()) {
|
if (job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").isEmpty()) {
|
||||||
List<String> partitions = new ArrayList<>(part.getPartSpec().keySet());
|
List<String> partitions = new ArrayList<>(part.getPartSpec().keySet());
|
||||||
if (!partitions.isEmpty()) {
|
if (!partitions.isEmpty()) {
|
||||||
String partitionStr = String.join(",", partitions);
|
String partitionStr = String.join("/", partitions);
|
||||||
LOG.info("Setting Partitions in jobConf - Partition Keys for Path : " + path + " is :" + partitionStr);
|
LOG.info("Setting Partitions in jobConf - Partition Keys for Path : " + path + " is :" + partitionStr);
|
||||||
job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, partitionStr);
|
job.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, partitionStr);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hadoop.functional;
|
package org.apache.hudi.hadoop.functional;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||||
@@ -26,6 +27,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
|
|||||||
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||||
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
|
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
|
||||||
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
|
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
|
||||||
|
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||||
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
@@ -51,6 +53,7 @@ import org.junit.jupiter.api.Test;
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
|
||||||
import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
|
import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
|
||||||
@@ -84,6 +87,75 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
|
|||||||
HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception {
|
||||||
|
// test for HUDI-1718
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// initial commit
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
|
||||||
|
HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
||||||
|
String commitTime = "100";
|
||||||
|
final int numRecords = 1000;
|
||||||
|
// Create 3 parquet files with 1000 records each
|
||||||
|
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
|
||||||
|
InputFormatTestUtil.commit(tempDir, commitTime);
|
||||||
|
|
||||||
|
TableDesc tblDesc = Utilities.defaultTd;
|
||||||
|
// Set the input format
|
||||||
|
tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
|
||||||
|
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
|
||||||
|
LinkedHashMap<Path, ArrayList<String>> talias = new LinkedHashMap<>();
|
||||||
|
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
|
||||||
|
// add three level partitions info
|
||||||
|
partSpec.put("year", "2016");
|
||||||
|
partSpec.put("month", "05");
|
||||||
|
partSpec.put("day", "01");
|
||||||
|
PartitionDesc partDesc = new PartitionDesc(tblDesc, partSpec);
|
||||||
|
|
||||||
|
pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
|
||||||
|
|
||||||
|
ArrayList<String> arrayList = new ArrayList<>();
|
||||||
|
arrayList.add(tempDir.toAbsolutePath().toString());
|
||||||
|
talias.put(new Path(tempDir.toAbsolutePath().toString()), arrayList);
|
||||||
|
|
||||||
|
MapredWork mrwork = new MapredWork();
|
||||||
|
mrwork.getMapWork().setPathToPartitionInfo(pt);
|
||||||
|
mrwork.getMapWork().setPathToAliases(talias);
|
||||||
|
|
||||||
|
Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
|
||||||
|
Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
|
||||||
|
jobConf = new JobConf(conf);
|
||||||
|
// Add the paths
|
||||||
|
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||||
|
jobConf.set(HAS_MAP_WORK, "true");
|
||||||
|
// The following config tells Hive to choose ExecMapper to read the MAP_WORK
|
||||||
|
jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
|
||||||
|
// setting the split size to be 3 to create one split for 3 file groups
|
||||||
|
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000");
|
||||||
|
|
||||||
|
HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
|
||||||
|
String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
|
||||||
|
InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
|
||||||
|
// unset META_TABLE_PARTITION_COLUMNS to trigger HUDI-1718
|
||||||
|
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
|
||||||
|
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
|
||||||
|
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
|
||||||
|
assertEquals(1, splits.length);
|
||||||
|
|
||||||
|
// if HUDI-1718 is not fixed, the follow code will throw exception
|
||||||
|
RecordReader<NullWritable, ArrayWritable> recordReader =
|
||||||
|
combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
|
||||||
|
NullWritable nullWritable = recordReader.createKey();
|
||||||
|
ArrayWritable arrayWritable = recordReader.createValue();
|
||||||
|
int counter = 0;
|
||||||
|
while (recordReader.next(nullWritable, arrayWritable)) {
|
||||||
|
// read over all the splits
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
// should read out 3 splits, each for file0, file1, file2 containing 1000 records each
|
||||||
|
assertEquals(3000, counter);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Disabled
|
@Disabled
|
||||||
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
|
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
|
||||||
|
|||||||
Reference in New Issue
Block a user