[HUDI-1722]Fix hive beeline/spark-sql query specified field on mor table occur NPE (#2722)
This commit is contained in:
@@ -85,12 +85,12 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
|
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
|
||||||
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
|
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
|
||||||
// actual heavy lifting of reading the parquet files happen.
|
// actual heavy lifting of reading the parquet files happen.
|
||||||
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
|
if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) {
|
||||||
synchronized (jobConf) {
|
synchronized (jobConf) {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
||||||
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
||||||
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
|
if (HoodieRealtimeInputFormatUtils.canAddProjectionToJobConf(realtimeSplit, jobConf)) {
|
||||||
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
|
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
|
||||||
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
|
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
|
||||||
// hoodie additional projection columns are reset after calling setConf and only natural projections
|
// hoodie additional projection columns are reset after calling setConf and only natural projections
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hadoop.utils;
|
package org.apache.hudi.hadoop.utils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
|
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.FileSlice;
|
import org.apache.hudi.common.model.FileSlice;
|
||||||
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
|||||||
import org.apache.hadoop.mapred.FileSplit;
|
import org.apache.hadoop.mapred.FileSplit;
|
||||||
import org.apache.hadoop.mapred.InputSplit;
|
import org.apache.hadoop.mapred.InputSplit;
|
||||||
import org.apache.hadoop.mapred.SplitLocationInfo;
|
import org.apache.hadoop.mapred.SplitLocationInfo;
|
||||||
|
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -218,6 +220,18 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
|||||||
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
|
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean requiredProjectionFieldsExistInConf(Configuration configuration) {
|
||||||
|
String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
|
||||||
|
return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
|
||||||
|
&& readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
|
||||||
|
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean canAddProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
|
||||||
|
return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null
|
||||||
|
|| (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*),
|
* Hive will append read columns' ids to old columns' ids during getRecordReader. In some cases, e.g. SELECT COUNT(*),
|
||||||
* the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes
|
* the read columns' id is an empty string and Hive will combine it with Hoodie required projection ids and becomes
|
||||||
|
|||||||
@@ -156,6 +156,73 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
|
|||||||
assertEquals(3000, counter);
|
assertEquals(3000, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMutilReaderRealtimeComineHoodieInputFormat() throws Exception {
|
||||||
|
// test for hudi-1722
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// initial commit
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
|
||||||
|
HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
|
||||||
|
String commitTime = "100";
|
||||||
|
final int numRecords = 1000;
|
||||||
|
// Create 3 parquet files with 1000 records each
|
||||||
|
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
|
||||||
|
InputFormatTestUtil.commit(tempDir, commitTime);
|
||||||
|
|
||||||
|
String newCommitTime = "101";
|
||||||
|
// to trigger the bug of HUDI-1772, only update fileid2
|
||||||
|
// insert 1000 update records to log file 2
|
||||||
|
// now fileid0, fileid1 has no log files, fileid2 has log file
|
||||||
|
HoodieLogFormat.Writer writer =
|
||||||
|
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime,
|
||||||
|
numRecords, numRecords, 0);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
TableDesc tblDesc = Utilities.defaultTd;
|
||||||
|
// Set the input format
|
||||||
|
tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
|
||||||
|
PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
|
||||||
|
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
|
||||||
|
LinkedHashMap<Path, ArrayList<String>> tableAlias = new LinkedHashMap<>();
|
||||||
|
ArrayList<String> alias = new ArrayList<>();
|
||||||
|
alias.add(tempDir.toAbsolutePath().toString());
|
||||||
|
tableAlias.put(new Path(tempDir.toAbsolutePath().toString()), alias);
|
||||||
|
pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
|
||||||
|
|
||||||
|
MapredWork mrwork = new MapredWork();
|
||||||
|
mrwork.getMapWork().setPathToPartitionInfo(pt);
|
||||||
|
mrwork.getMapWork().setPathToAliases(tableAlias);
|
||||||
|
Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
|
||||||
|
Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
|
||||||
|
jobConf = new JobConf(conf);
|
||||||
|
// Add the paths
|
||||||
|
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||||
|
jobConf.set(HAS_MAP_WORK, "true");
|
||||||
|
// The following config tells Hive to choose ExecMapper to read the MAP_WORK
|
||||||
|
jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
|
||||||
|
// set SPLIT_MAXSIZE larger to create one split for 3 files groups
|
||||||
|
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000");
|
||||||
|
|
||||||
|
HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
|
||||||
|
String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
|
||||||
|
InputFormatTestUtil.setProjectFieldsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
|
||||||
|
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
|
||||||
|
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
|
||||||
|
assertEquals(1, splits.length);
|
||||||
|
RecordReader<NullWritable, ArrayWritable> recordReader =
|
||||||
|
combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
|
||||||
|
NullWritable nullWritable = recordReader.createKey();
|
||||||
|
ArrayWritable arrayWritable = recordReader.createValue();
|
||||||
|
int counter = 0;
|
||||||
|
while (recordReader.next(nullWritable, arrayWritable)) {
|
||||||
|
// read over all the splits
|
||||||
|
counter++;
|
||||||
|
}
|
||||||
|
// should read out 3 splits, each for file0, file1, file2 containing 1000 records each
|
||||||
|
assertEquals(3000, counter);
|
||||||
|
recordReader.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Disabled
|
@Disabled
|
||||||
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
|
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
|
||||||
|
|||||||
@@ -326,6 +326,33 @@ public class InputFormatTestUtil {
|
|||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void setProjectFieldsForInputFormat(JobConf jobConf,
|
||||||
|
Schema schema, String hiveColumnTypes) {
|
||||||
|
List<Schema.Field> fields = schema.getFields();
|
||||||
|
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
|
||||||
|
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
|
||||||
|
Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
|
||||||
|
|
||||||
|
String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr"))
|
||||||
|
.map(Schema.Field::name).collect(Collectors.joining(","));
|
||||||
|
hiveColumnNames = hiveColumnNames + ",datestr";
|
||||||
|
String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes);
|
||||||
|
modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string";
|
||||||
|
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
|
||||||
|
jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes);
|
||||||
|
// skip choose hoodie meta_columns, only choose one origin column to trigger HUID-1722
|
||||||
|
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names.split(",")[5]);
|
||||||
|
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions.split(",")[5]);
|
||||||
|
jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
|
||||||
|
conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
|
||||||
|
// skip choose hoodie meta_columns, only choose one origin column to trigger HUID-1722
|
||||||
|
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names.split(",")[5]);
|
||||||
|
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions.split(",")[5]);
|
||||||
|
conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr");
|
||||||
|
conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes);
|
||||||
|
jobConf.addResource(conf);
|
||||||
|
}
|
||||||
|
|
||||||
public static void setPropsForInputFormat(JobConf jobConf,
|
public static void setPropsForInputFormat(JobConf jobConf,
|
||||||
Schema schema, String hiveColumnTypes) {
|
Schema schema, String hiveColumnTypes) {
|
||||||
List<Schema.Field> fields = schema.getFields();
|
List<Schema.Field> fields = schema.getFields();
|
||||||
|
|||||||
Reference in New Issue
Block a user