1
0

[HUDI-2177][HUDI-2200] Adding virtual keys support for MOR table (#3315)

This commit is contained in:
Sivabalan Narayanan
2021-08-02 09:45:09 -04:00
committed by GitHub
parent dde57b293c
commit fe508376fa
37 changed files with 633 additions and 261 deletions

View File

@@ -45,6 +45,14 @@ public class InputSplitUtils {
return new String(bytes, StandardCharsets.UTF_8);
}
public static void writeBoolean(Boolean valueToWrite, DataOutput out) throws IOException {
out.writeBoolean(valueToWrite);
}
public static boolean readBoolean(DataInput in) throws IOException {
return in.readBoolean();
}
/**
* Return correct base-file schema based on split.
*

View File

@@ -18,9 +18,14 @@
package org.apache.hudi.hadoop.realtime;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Stream;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -31,16 +36,13 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Stream;
/**
* HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base file format.
*/
@@ -90,7 +92,7 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
// time.
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, Option.empty());
this.conf = jobConf;
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
@@ -36,7 +37,6 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -100,7 +100,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
// time.
HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getHoodieVirtualKeyInfo());
}
this.conf = jobConf;
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.mapred.FileSplit;
import java.io.DataInput;
@@ -36,16 +38,20 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
private String basePath;
private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
public HoodieRealtimeFileSplit() {
super();
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime)
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime,
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo)
throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaLogPaths = deltaLogPaths;
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
}
public List<String> getDeltaLogPaths() {
@@ -60,6 +66,16 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit
return basePath;
}
@Override
public void setHoodieVirtualKeyInfo(Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
}
@Override
public Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo() {
return hoodieVirtualKeyInfo;
}
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop.realtime;
import java.io.Serializable;
/**
* Class to hold virtual key info when meta fields are disabled.
*/
public class HoodieVirtualKeyInfo implements Serializable {
private final String recordKeyField;
private final String partitionPathField;
private final int recordKeyFieldIndex;
private final int partitionPathFieldIndex;
public HoodieVirtualKeyInfo(String recordKeyField, String partitionPathField, int recordKeyFieldIndex, int partitionPathFieldIndex) {
this.recordKeyField = recordKeyField;
this.partitionPathField = partitionPathField;
this.recordKeyFieldIndex = recordKeyFieldIndex;
this.partitionPathFieldIndex = partitionPathFieldIndex;
}
public String getRecordKeyField() {
return recordKeyField;
}
public String getPartitionPathField() {
return partitionPathField;
}
public int getRecordKeyFieldIndex() {
return recordKeyFieldIndex;
}
public int getPartitionPathFieldIndex() {
return partitionPathFieldIndex;
}
@Override
public String toString() {
return "HoodieVirtualKeyInfo{"
+ "recordKeyField='" + recordKeyField + '\''
+ ", partitionPathField='" + partitionPathField + '\''
+ ", recordKeyFieldIndex=" + recordKeyFieldIndex
+ ", partitionPathFieldIndex=" + partitionPathFieldIndex
+ '}';
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hadoop.mapred.FileSplit;
@@ -77,6 +78,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
return basePath;
}
@Override
public Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo() {
return Option.empty();
}
@Override
public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
@@ -91,4 +97,8 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple
public void setBasePath(String basePath) {
this.basePath = basePath;
}
@Override
public void setHoodieVirtualKeyInfo(Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {}
}

View File

@@ -53,6 +53,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
private final Set<String> deltaRecordKeys;
private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
private Iterator<String> deltaItr;
public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
@@ -61,6 +62,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
this.parquetReader = realReader;
this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
if (split.getHoodieVirtualKeyInfo().isPresent()) {
this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex();
}
}
/**
@@ -102,7 +106,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
// with a new block of values
while (this.parquetReader.next(aVoid, arrayWritable)) {
if (!deltaRecordMap.isEmpty()) {
String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
String key = arrayWritable.get()[recordKeyIndex].toString();
if (deltaRecordMap.containsKey(key)) {
// mark the key as handled
this.deltaRecordKeys.remove(key);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.InputSplitUtils;
import org.apache.hadoop.fs.Path;
@@ -52,8 +53,15 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
*/
String getBasePath();
/**
* Returns Virtual key info if meta fields are disabled.
* @return
*/
Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo();
/**
* Update Log File Paths.
*
* @param deltaLogPaths
*/
void setDeltaLogPaths(List<String> deltaLogPaths);
@@ -70,6 +78,8 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
*/
void setBasePath(String basePath);
void setHoodieVirtualKeyInfo(Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo);
default void writeToOutput(DataOutput out) throws IOException {
InputSplitUtils.writeString(getBasePath(), out);
InputSplitUtils.writeString(getMaxCommitTime(), out);
@@ -77,6 +87,15 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
for (String logFilePath : getDeltaLogPaths()) {
InputSplitUtils.writeString(logFilePath, out);
}
if (!getHoodieVirtualKeyInfo().isPresent()) {
InputSplitUtils.writeBoolean(false, out);
} else {
InputSplitUtils.writeBoolean(true, out);
InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getRecordKeyField(), out);
InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getPartitionPathField(), out);
InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex()), out);
InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getPartitionPathFieldIndex()), out);
}
}
default void readFromInput(DataInput in) throws IOException {
@@ -88,6 +107,14 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
deltaLogPaths.add(InputSplitUtils.readString(in));
}
setDeltaLogPaths(deltaLogPaths);
boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in);
if (hoodieVirtualKeyPresent) {
String recordKeyField = InputSplitUtils.readString(in);
String partitionPathField = InputSplitUtils.readString(in);
int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in));
int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in));
setHoodieVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
}
}
/**

View File

@@ -18,14 +18,15 @@
package org.apache.hudi.hadoop.utils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
@@ -37,21 +38,24 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -77,6 +81,24 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
// grouped on file id
List<InputSplit> rtSplits = new ArrayList<>();
try {
// Pre process tableConfig from first partition to fetch virtual key info
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
if (partitionsToParquetSplits.size() > 0) {
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionsToParquetSplits.keySet().iterator().next());
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (!tableConfig.populateMetaFields()) {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema();
hoodieVirtualKeyInfo = Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
}
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
@@ -121,7 +143,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(),
logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit()));
} else {
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime));
rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime, finalHoodieVirtualKeyInfo));
}
} catch (IOException e) {
throw new HoodieIOException("Error creating hoodie real time split ", e);
@@ -173,7 +195,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
});
return baseAndLogsList;
}
/**
* Add a field to the existing fields projected.
@@ -204,23 +226,34 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
return conf;
}
public static void addRequiredProjectionFields(Configuration configuration) {
public static void addRequiredProjectionFields(Configuration configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
// Need this to do merge records in HoodieRealtimeRecordReader
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
if (!hoodieVirtualKeyInfo.isPresent()) {
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS);
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS);
} else {
HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get();
addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex());
addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex());
}
}
public static boolean requiredProjectionFieldsExistInConf(Configuration configuration) {
public static boolean requiredProjectionFieldsExistInConf(Configuration configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "");
return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
&& readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
if (!hoodieVirtualKeyInfo.isPresent()) {
return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD)
&& readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
} else {
return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField())
&& readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField());
}
}
public static boolean canAddProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) {
return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null
|| (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf));
|| (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getHoodieVirtualKeyInfo()));
}
/**