[HUDI-4171] Fixing Non partitioned with virtual keys in read path (#5747)
- When Non partitioned key gen is used with virtual keys, read path could break since partition path may not exist.
This commit is contained in:
committed by
GitHub
parent
21b903fddb
commit
7da97c8096
@@ -103,10 +103,19 @@ public class HoodieTestUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
|
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
|
||||||
HoodieFileFormat baseFileFormat)
|
HoodieFileFormat baseFileFormat) throws IOException {
|
||||||
|
return init(hadoopConf, basePath, tableType, baseFileFormat, false, null, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
|
||||||
|
HoodieFileFormat baseFileFormat, boolean setKeyGen, String keyGenerator, boolean populateMetaFields)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString());
|
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString());
|
||||||
|
if (setKeyGen) {
|
||||||
|
properties.setProperty("hoodie.datasource.write.keygenerator.class", keyGenerator);
|
||||||
|
}
|
||||||
|
properties.setProperty("hoodie.populate.meta.fields", Boolean.toString(populateMetaFields));
|
||||||
return init(hadoopConf, basePath, tableType, properties);
|
return init(hadoopConf, basePath, tableType, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.CollectionUtils;
|
import org.apache.hudi.common.util.CollectionUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
|
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
|
||||||
@@ -275,16 +276,16 @@ public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat {
|
|||||||
if (tableConfig.populateMetaFields()) {
|
if (tableConfig.populateMetaFields()) {
|
||||||
return Option.empty();
|
return Option.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
|
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
|
||||||
try {
|
try {
|
||||||
Schema schema = tableSchemaResolver.getTableAvroSchema();
|
Schema schema = tableSchemaResolver.getTableAvroSchema();
|
||||||
|
boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
|
||||||
return Option.of(
|
return Option.of(
|
||||||
new HoodieVirtualKeyInfo(
|
new HoodieVirtualKeyInfo(
|
||||||
tableConfig.getRecordKeyFieldProp(),
|
tableConfig.getRecordKeyFieldProp(),
|
||||||
tableConfig.getPartitionFieldProp(),
|
isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()),
|
||||||
schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
|
schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
|
||||||
schema.getField(tableConfig.getPartitionFieldProp()).pos()));
|
isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
throw new HoodieException("Fetching table schema failed with exception ", exception);
|
throw new HoodieException("Fetching table schema failed with exception ", exception);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hadoop.realtime;
|
package org.apache.hudi.hadoop.realtime;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -26,11 +28,11 @@ import java.io.Serializable;
|
|||||||
public class HoodieVirtualKeyInfo implements Serializable {
|
public class HoodieVirtualKeyInfo implements Serializable {
|
||||||
|
|
||||||
private final String recordKeyField;
|
private final String recordKeyField;
|
||||||
private final String partitionPathField;
|
private final Option<String> partitionPathField;
|
||||||
private final int recordKeyFieldIndex;
|
private final int recordKeyFieldIndex;
|
||||||
private final int partitionPathFieldIndex;
|
private final Option<Integer> partitionPathFieldIndex;
|
||||||
|
|
||||||
public HoodieVirtualKeyInfo(String recordKeyField, String partitionPathField, int recordKeyFieldIndex, int partitionPathFieldIndex) {
|
public HoodieVirtualKeyInfo(String recordKeyField, Option<String> partitionPathField, int recordKeyFieldIndex, Option<Integer> partitionPathFieldIndex) {
|
||||||
this.recordKeyField = recordKeyField;
|
this.recordKeyField = recordKeyField;
|
||||||
this.partitionPathField = partitionPathField;
|
this.partitionPathField = partitionPathField;
|
||||||
this.recordKeyFieldIndex = recordKeyFieldIndex;
|
this.recordKeyFieldIndex = recordKeyFieldIndex;
|
||||||
@@ -41,7 +43,7 @@ public class HoodieVirtualKeyInfo implements Serializable {
|
|||||||
return recordKeyField;
|
return recordKeyField;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPartitionPathField() {
|
public Option<String> getPartitionPathField() {
|
||||||
return partitionPathField;
|
return partitionPathField;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,7 +51,7 @@ public class HoodieVirtualKeyInfo implements Serializable {
|
|||||||
return recordKeyFieldIndex;
|
return recordKeyFieldIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getPartitionPathFieldIndex() {
|
public Option<Integer> getPartitionPathFieldIndex() {
|
||||||
return partitionPathFieldIndex;
|
return partitionPathFieldIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,9 +59,9 @@ public class HoodieVirtualKeyInfo implements Serializable {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
return "HoodieVirtualKeyInfo{"
|
return "HoodieVirtualKeyInfo{"
|
||||||
+ "recordKeyField='" + recordKeyField + '\''
|
+ "recordKeyField='" + recordKeyField + '\''
|
||||||
+ ", partitionPathField='" + partitionPathField + '\''
|
+ ", partitionPathField='" + (partitionPathField.isPresent() ? partitionPathField.get() : "null") + '\''
|
||||||
+ ", recordKeyFieldIndex=" + recordKeyFieldIndex
|
+ ", recordKeyFieldIndex=" + recordKeyFieldIndex
|
||||||
+ ", partitionPathFieldIndex=" + partitionPathFieldIndex
|
+ ", partitionPathFieldIndex=" + (partitionPathFieldIndex.isPresent() ? partitionPathFieldIndex.get() : "-1")
|
||||||
+ '}';
|
+ '}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -107,9 +107,12 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
|
|||||||
} else {
|
} else {
|
||||||
InputSplitUtils.writeBoolean(true, out);
|
InputSplitUtils.writeBoolean(true, out);
|
||||||
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out);
|
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out);
|
||||||
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField(), out);
|
|
||||||
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out);
|
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out);
|
||||||
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
|
InputSplitUtils.writeBoolean(virtualKeyInfoOpt.get().getPartitionPathField().isPresent(), out);
|
||||||
|
if (virtualKeyInfoOpt.get().getPartitionPathField().isPresent()) {
|
||||||
|
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField().get(), out);
|
||||||
|
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,9 +133,10 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
|
|||||||
boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in);
|
boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in);
|
||||||
if (hoodieVirtualKeyPresent) {
|
if (hoodieVirtualKeyPresent) {
|
||||||
String recordKeyField = InputSplitUtils.readString(in);
|
String recordKeyField = InputSplitUtils.readString(in);
|
||||||
String partitionPathField = InputSplitUtils.readString(in);
|
|
||||||
int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in));
|
int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in));
|
||||||
int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in));
|
boolean isPartitionPathFieldPresent = InputSplitUtils.readBoolean(in);
|
||||||
|
Option<String> partitionPathField = isPartitionPathFieldPresent ? Option.of(InputSplitUtils.readString(in)) : Option.empty();
|
||||||
|
Option<Integer> partitionPathIndex = isPartitionPathFieldPresent ? Option.of(Integer.parseInt(InputSplitUtils.readString(in))) : Option.empty();
|
||||||
setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
|
setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,7 +87,9 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
|||||||
} else {
|
} else {
|
||||||
HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get();
|
HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get();
|
||||||
addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex());
|
addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex());
|
||||||
addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex());
|
if (hoodieVirtualKey.getPartitionPathField().isPresent()) {
|
||||||
|
addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField().get(), hoodieVirtualKey.getPartitionPathFieldIndex().get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,7 +101,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
|
|||||||
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
|
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
|
||||||
} else {
|
} else {
|
||||||
return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField())
|
return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField())
|
||||||
&& readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField());
|
&& (hoodieVirtualKeyInfo.get().getPartitionPathField().isPresent() ? readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField().get())
|
||||||
|
: true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,12 +27,15 @@ import org.apache.hadoop.mapred.InputSplit;
|
|||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.RecordReader;
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -40,6 +43,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||||
|
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||||
|
import org.apache.hudi.common.util.CommitUtils;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
|
||||||
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
|
||||||
@@ -55,6 +61,7 @@ import java.io.IOException;
|
|||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||||
@@ -167,6 +174,26 @@ public class TestHoodieParquetInputFormat {
|
|||||||
assertEquals(10, files.length);
|
assertEquals(10, files.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInputFormatLoadForNonPartitionedAndVirtualKeyedTable() throws IOException {
|
||||||
|
// initial commit
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
|
||||||
|
File partitionDir = InputFormatTestUtil.prepareCustomizedTable(basePath, baseFileFormat, 10, "100", true, false,
|
||||||
|
true, schema);
|
||||||
|
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
|
||||||
|
schema.toString(), HoodieTimeline.COMMIT_ACTION);
|
||||||
|
FileCreateUtils.createCommit(basePath.toString(), "100", Option.of(commitMetadata));
|
||||||
|
|
||||||
|
// Add the paths
|
||||||
|
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||||
|
|
||||||
|
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
|
||||||
|
assertEquals(10, inputSplits.length);
|
||||||
|
|
||||||
|
FileStatus[] files = inputFormat.listStatus(jobConf);
|
||||||
|
assertEquals(10, files.length);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInputFormatLoadWithEmptyTable() throws IOException {
|
public void testInputFormatLoadWithEmptyTable() throws IOException {
|
||||||
// initial hoodie table
|
// initial hoodie table
|
||||||
|
|||||||
@@ -71,16 +71,35 @@ public class InputFormatTestUtil {
|
|||||||
private static String TEST_WRITE_TOKEN = "1-0-1";
|
private static String TEST_WRITE_TOKEN = "1-0-1";
|
||||||
|
|
||||||
public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
|
public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
|
||||||
String commitNumber)
|
String commitNumber) throws IOException {
|
||||||
|
return prepareCustomizedTable(basePath, baseFileFormat, numberOfFiles, commitNumber, false, true, false, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
|
||||||
|
String commitNumber, boolean useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema schema)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
|
if (useNonPartitionedKeyGen) {
|
||||||
baseFileFormat);
|
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
|
||||||
|
baseFileFormat, true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", populateMetaFields);
|
||||||
|
} else {
|
||||||
|
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
|
||||||
|
baseFileFormat);
|
||||||
|
}
|
||||||
|
|
||||||
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
|
java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
|
||||||
setupPartition(basePath, partitionPath);
|
setupPartition(basePath, partitionPath);
|
||||||
|
|
||||||
return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
|
if (injectData) {
|
||||||
commitNumber);
|
try {
|
||||||
|
createSimpleData(schema, partitionPath, numberOfFiles, 100, commitNumber);
|
||||||
|
return partitionPath.toFile();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException("Excpetion thrown while writing data ", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
|
||||||
|
commitNumber);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
|
public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
|
||||||
|
|||||||
Reference in New Issue
Block a user