[HUDI-1719] hive on spark/mr,Incremental query of the mor table, the partition field is incorrect (#2720)
This commit is contained in:
@@ -147,4 +147,12 @@ public abstract class AbstractRealtimeRecordReader {
|
|||||||
public Schema getHiveSchema() {
|
public Schema getHiveSchema() {
|
||||||
return hiveSchema;
|
return hiveSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RealtimeSplit getSplit() {
|
||||||
|
return split;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JobConf getJobConf() {
|
||||||
|
return jobConf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.common.util.ValidationUtils;
|
|||||||
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
|
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
|
||||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hive.ql.io.IOContextMap;
|
||||||
import org.apache.hadoop.io.ArrayWritable;
|
import org.apache.hadoop.io.ArrayWritable;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.mapred.InputSplit;
|
import org.apache.hadoop.mapred.InputSplit;
|
||||||
@@ -71,6 +72,9 @@ public class HoodieCombineRealtimeRecordReader implements RecordReader<NullWrita
|
|||||||
} else if (recordReaders.size() > 0) {
|
} else if (recordReaders.size() > 0) {
|
||||||
this.currentRecordReader.close();
|
this.currentRecordReader.close();
|
||||||
this.currentRecordReader = recordReaders.remove(0);
|
this.currentRecordReader = recordReaders.remove(0);
|
||||||
|
AbstractRealtimeRecordReader reader = (AbstractRealtimeRecordReader)currentRecordReader.getReader();
|
||||||
|
// when switch reader, ioctx should be updated
|
||||||
|
IOContextMap.get(reader.getJobConf()).setInputPath(reader.getSplit().getPath());
|
||||||
return next(key, value);
|
return next(key, value);
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -103,4 +103,8 @@ public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, Ar
|
|||||||
public float getProgress() throws IOException {
|
public float getProgress() throws IOException {
|
||||||
return this.reader.getProgress();
|
return this.reader.getProgress();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RecordReader<NullWritable, ArrayWritable> getReader() {
|
||||||
|
return this.reader;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.hadoop.functional;
|
package org.apache.hudi.hadoop.functional;
|
||||||
|
|
||||||
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
|
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
|
||||||
|
import org.apache.hadoop.hive.ql.io.IOContextMap;
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||||
@@ -27,6 +28,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
|
|||||||
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||||
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
|
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
|
||||||
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
|
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
|
||||||
|
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
|
||||||
|
import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit;
|
||||||
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
|
||||||
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
||||||
|
|
||||||
@@ -45,6 +48,7 @@ import org.apache.hadoop.mapred.FileInputFormat;
|
|||||||
import org.apache.hadoop.mapred.InputSplit;
|
import org.apache.hadoop.mapred.InputSplit;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.RecordReader;
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
|
import org.apache.hadoop.mapred.FileSplit;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
@@ -55,6 +59,8 @@ import java.io.File;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
|
import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
|
||||||
import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
|
import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
|
||||||
@@ -87,6 +93,87 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
|
|||||||
HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
HoodieTestUtils.init(MiniClusterUtil.configuration, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void multiPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception {
|
||||||
|
// test for HUDI-1718
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// initial commit
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
|
||||||
|
HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
||||||
|
String commitTime = "100";
|
||||||
|
final int numRecords = 1000;
|
||||||
|
// Create 3 partitions, each partition holds one parquet file and 1000 records
|
||||||
|
List<File> partitionDirs = InputFormatTestUtil
|
||||||
|
.prepareMultiPartitionedParquetTable(tempDir, schema, 3, numRecords, commitTime);
|
||||||
|
InputFormatTestUtil.commit(tempDir, commitTime);
|
||||||
|
|
||||||
|
TableDesc tblDesc = Utilities.defaultTd;
|
||||||
|
// Set the input format
|
||||||
|
tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
|
||||||
|
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
|
||||||
|
LinkedHashMap<Path, ArrayList<String>> talias = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
|
||||||
|
|
||||||
|
pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
|
||||||
|
|
||||||
|
ArrayList<String> arrayList = new ArrayList<>();
|
||||||
|
arrayList.add(tempDir.toAbsolutePath().toString());
|
||||||
|
talias.put(new Path(tempDir.toAbsolutePath().toString()), arrayList);
|
||||||
|
|
||||||
|
MapredWork mrwork = new MapredWork();
|
||||||
|
mrwork.getMapWork().setPathToPartitionInfo(pt);
|
||||||
|
mrwork.getMapWork().setPathToAliases(talias);
|
||||||
|
|
||||||
|
Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
|
||||||
|
Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
|
||||||
|
jobConf = new JobConf(conf);
|
||||||
|
// Add three partition path to InputPaths
|
||||||
|
Path[] partitionDirArray = new Path[partitionDirs.size()];
|
||||||
|
partitionDirs.stream().map(p -> new Path(p.getPath())).collect(Collectors.toList()).toArray(partitionDirArray);
|
||||||
|
FileInputFormat.setInputPaths(jobConf, partitionDirArray);
|
||||||
|
jobConf.set(HAS_MAP_WORK, "true");
|
||||||
|
// The following config tells Hive to choose ExecMapper to read the MAP_WORK
|
||||||
|
jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
|
||||||
|
// setting the split size to be 3 to create one split for 3 file groups
|
||||||
|
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000");
|
||||||
|
|
||||||
|
HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
|
||||||
|
String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
|
||||||
|
InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
|
||||||
|
|
||||||
|
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
|
||||||
|
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
|
||||||
|
assertEquals(1, splits.length);
|
||||||
|
|
||||||
|
RecordReader<NullWritable, ArrayWritable> recordReader =
|
||||||
|
combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
|
||||||
|
NullWritable nullWritable = recordReader.createKey();
|
||||||
|
ArrayWritable arrayWritable = recordReader.createValue();
|
||||||
|
int counter = 0;
|
||||||
|
|
||||||
|
HoodieCombineRealtimeHiveSplit hiveSplit = (HoodieCombineRealtimeHiveSplit)splits[0];
|
||||||
|
HoodieCombineRealtimeFileSplit fileSplit = (HoodieCombineRealtimeFileSplit)hiveSplit.getInputSplitShim();
|
||||||
|
List<FileSplit> realtimeFileSplits = fileSplit.getRealtimeFileSplits();
|
||||||
|
|
||||||
|
while (recordReader.next(nullWritable, arrayWritable)) {
|
||||||
|
// since each file holds 1000 records, when counter % 1000 == 0,
|
||||||
|
// HoodieCombineRealtimeRecordReader will switch reader internal
|
||||||
|
// Hive use ioctx to extract partition info, when switch reader, ioctx should be updated.
|
||||||
|
if (counter < 1000) {
|
||||||
|
assertEquals(IOContextMap.get(jobConf).getInputPath().toString(), realtimeFileSplits.get(0).getPath().toString());
|
||||||
|
} else if (counter < 2000) {
|
||||||
|
assertEquals(IOContextMap.get(jobConf).getInputPath().toString(), realtimeFileSplits.get(1).getPath().toString());
|
||||||
|
} else {
|
||||||
|
assertEquals(IOContextMap.get(jobConf).getInputPath().toString(), realtimeFileSplits.get(2).getPath().toString());
|
||||||
|
}
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
// should read out 3 splits, each for file0, file1, file2 containing 1000 records each
|
||||||
|
assertEquals(3000, counter);
|
||||||
|
recordReader.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception {
|
public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws Exception {
|
||||||
// test for HUDI-1718
|
// test for HUDI-1718
|
||||||
@@ -154,6 +241,7 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
|
|||||||
}
|
}
|
||||||
// should read out 3 splits, each for file0, file1, file2 containing 1000 records each
|
// should read out 3 splits, each for file0, file1, file2 containing 1000 records each
|
||||||
assertEquals(3000, counter);
|
assertEquals(3000, counter);
|
||||||
|
recordReader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -203,6 +203,18 @@ public class InputFormatTestUtil {
|
|||||||
return basePath.toFile();
|
return basePath.toFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<File> prepareMultiPartitionedParquetTable(java.nio.file.Path basePath, Schema schema,
|
||||||
|
int numberPartitions, int numberOfRecordsPerPartition, String commitNumber) throws IOException {
|
||||||
|
List<File> result = new ArrayList<>();
|
||||||
|
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString());
|
||||||
|
for (int i = 0; i < numberPartitions; i++) {
|
||||||
|
java.nio.file.Path partitionPath = basePath.resolve(Paths.get(2016 + i + "", "05", "01"));
|
||||||
|
createData(schema, partitionPath, 1, numberOfRecordsPerPartition, commitNumber);
|
||||||
|
result.add(partitionPath.toFile());
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
private static void createData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords,
|
private static void createData(Schema schema, java.nio.file.Path partitionPath, int numberOfFiles, int numberOfRecords,
|
||||||
String commitNumber) throws IOException {
|
String commitNumber) throws IOException {
|
||||||
AvroParquetWriter parquetWriter;
|
AvroParquetWriter parquetWriter;
|
||||||
|
|||||||
Reference in New Issue
Block a user