[HUDI-2632] Schema evolution for flink parquet reader (#3872)
This commit is contained in:
@@ -110,10 +110,14 @@ public class FormatUtils {
|
|||||||
List<Schema.Field> requiredFields = requiredSchema.getFields();
|
List<Schema.Field> requiredFields = requiredSchema.getFields();
|
||||||
assert (requiredFields.size() == requiredPos.length);
|
assert (requiredFields.size() == requiredPos.length);
|
||||||
Iterator<Integer> positionIterator = Arrays.stream(requiredPos).iterator();
|
Iterator<Integer> positionIterator = Arrays.stream(requiredPos).iterator();
|
||||||
requiredFields.forEach(f -> recordBuilder.set(f, record.get(positionIterator.next())));
|
requiredFields.forEach(f -> recordBuilder.set(f, getVal(record, positionIterator.next())));
|
||||||
return recordBuilder.build();
|
return recordBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Object getVal(IndexedRecord record, int pos) {
|
||||||
|
return pos == -1 ? null : record.get(pos);
|
||||||
|
}
|
||||||
|
|
||||||
public static HoodieMergedLogRecordScanner logScanner(
|
public static HoodieMergedLogRecordScanner logScanner(
|
||||||
MergeOnReadInputSplit split,
|
MergeOnReadInputSplit split,
|
||||||
Schema logSchema,
|
Schema logSchema,
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
|
import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
|
||||||
import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
|
import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
|
||||||
@@ -67,6 +68,8 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
|
|
||||||
private final MessageType fileSchema;
|
private final MessageType fileSchema;
|
||||||
|
|
||||||
|
private final LogicalType[] requestedTypes;
|
||||||
|
|
||||||
private final MessageType requestedSchema;
|
private final MessageType requestedSchema;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -81,8 +84,6 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
|
|
||||||
private final ColumnarRowData row;
|
private final ColumnarRowData row;
|
||||||
|
|
||||||
private final LogicalType[] selectedTypes;
|
|
||||||
|
|
||||||
private final int batchSize;
|
private final int batchSize;
|
||||||
|
|
||||||
private ParquetFileReader reader;
|
private ParquetFileReader reader;
|
||||||
@@ -121,7 +122,6 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
long splitStart,
|
long splitStart,
|
||||||
long splitLength) throws IOException {
|
long splitLength) throws IOException {
|
||||||
this.utcTimestamp = utcTimestamp;
|
this.utcTimestamp = utcTimestamp;
|
||||||
this.selectedTypes = selectedTypes;
|
|
||||||
this.batchSize = batchSize;
|
this.batchSize = batchSize;
|
||||||
// then we need to apply the predicate push down filter
|
// then we need to apply the predicate push down filter
|
||||||
ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
|
ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
|
||||||
@@ -130,7 +130,13 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
|
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
|
||||||
|
|
||||||
this.fileSchema = footer.getFileMetaData().getSchema();
|
this.fileSchema = footer.getFileMetaData().getSchema();
|
||||||
this.requestedSchema = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
|
|
||||||
|
Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
|
||||||
|
int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray();
|
||||||
|
Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new);
|
||||||
|
|
||||||
|
this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new);
|
||||||
|
this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet");
|
||||||
this.reader = new ParquetFileReader(
|
this.reader = new ParquetFileReader(
|
||||||
conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
|
conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
|
||||||
|
|
||||||
@@ -146,23 +152,37 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
checkSchema();
|
checkSchema();
|
||||||
|
|
||||||
this.writableVectors = createWritableVectors();
|
this.writableVectors = createWritableVectors();
|
||||||
this.columnarBatch = generator.generate(createReadableVectors());
|
ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices);
|
||||||
|
this.columnarBatch = generator.generate(columnVectors);
|
||||||
this.row = new ColumnarRowData(columnarBatch);
|
this.row = new ColumnarRowData(columnarBatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Patches the given vectors with nulls.
|
||||||
|
* The vector position that is not requested (or read from file) is patched as null.
|
||||||
|
*
|
||||||
|
* @param fields The total selected fields number
|
||||||
|
* @param vectors The readable vectors
|
||||||
|
* @param indices The requested indices from the selected fields
|
||||||
|
*/
|
||||||
|
private static ColumnVector[] patchedVector(int fields, ColumnVector[] vectors, int[] indices) {
|
||||||
|
ColumnVector[] patched = new ColumnVector[fields];
|
||||||
|
for (int i = 0; i < indices.length; i++) {
|
||||||
|
patched[indices[i]] = vectors[i];
|
||||||
|
}
|
||||||
|
return patched;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clips `parquetSchema` according to `fieldNames`.
|
* Clips `parquetSchema` according to `fieldNames`.
|
||||||
*/
|
*/
|
||||||
private static MessageType clipParquetSchema(
|
private static Type[] clipParquetSchema(
|
||||||
GroupType parquetSchema, String[] fieldNames, boolean caseSensitive) {
|
GroupType parquetSchema, String[] fieldNames, boolean caseSensitive) {
|
||||||
Type[] types = new Type[fieldNames.length];
|
Type[] types = new Type[fieldNames.length];
|
||||||
if (caseSensitive) {
|
if (caseSensitive) {
|
||||||
for (int i = 0; i < fieldNames.length; ++i) {
|
for (int i = 0; i < fieldNames.length; ++i) {
|
||||||
String fieldName = fieldNames[i];
|
String fieldName = fieldNames[i];
|
||||||
if (parquetSchema.getFieldIndex(fieldName) < 0) {
|
types[i] = parquetSchema.containsField(fieldName) ? parquetSchema.getType(fieldName) : null;
|
||||||
throw new IllegalArgumentException(fieldName + " does not exist");
|
|
||||||
}
|
|
||||||
types[i] = parquetSchema.getType(fieldName);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Map<String, Type> caseInsensitiveFieldMap = new HashMap<>();
|
Map<String, Type> caseInsensitiveFieldMap = new HashMap<>();
|
||||||
@@ -178,23 +198,20 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
}
|
}
|
||||||
for (int i = 0; i < fieldNames.length; ++i) {
|
for (int i = 0; i < fieldNames.length; ++i) {
|
||||||
Type type = caseInsensitiveFieldMap.get(fieldNames[i].toLowerCase(Locale.ROOT));
|
Type type = caseInsensitiveFieldMap.get(fieldNames[i].toLowerCase(Locale.ROOT));
|
||||||
if (type == null) {
|
|
||||||
throw new IllegalArgumentException(fieldNames[i] + " does not exist");
|
|
||||||
}
|
|
||||||
// TODO clip for array,map,row types.
|
// TODO clip for array,map,row types.
|
||||||
types[i] = type;
|
types[i] = type;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Types.buildMessage().addFields(types).named("flink-parquet");
|
return types;
|
||||||
}
|
}
|
||||||
|
|
||||||
private WritableColumnVector[] createWritableVectors() {
|
private WritableColumnVector[] createWritableVectors() {
|
||||||
WritableColumnVector[] columns = new WritableColumnVector[selectedTypes.length];
|
WritableColumnVector[] columns = new WritableColumnVector[requestedTypes.length];
|
||||||
for (int i = 0; i < selectedTypes.length; i++) {
|
for (int i = 0; i < requestedTypes.length; i++) {
|
||||||
columns[i] = createWritableColumnVector(
|
columns[i] = createWritableColumnVector(
|
||||||
batchSize,
|
batchSize,
|
||||||
selectedTypes[i],
|
requestedTypes[i],
|
||||||
requestedSchema.getColumns().get(i).getPrimitiveType());
|
requestedSchema.getColumns().get(i).getPrimitiveType());
|
||||||
}
|
}
|
||||||
return columns;
|
return columns;
|
||||||
@@ -207,7 +224,7 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
private ColumnVector[] createReadableVectors() {
|
private ColumnVector[] createReadableVectors() {
|
||||||
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
|
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
|
||||||
for (int i = 0; i < writableVectors.length; i++) {
|
for (int i = 0; i < writableVectors.length; i++) {
|
||||||
vectors[i] = selectedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL
|
vectors[i] = requestedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL
|
||||||
? new ParquetDecimalVector(writableVectors[i])
|
? new ParquetDecimalVector(writableVectors[i])
|
||||||
: writableVectors[i];
|
: writableVectors[i];
|
||||||
}
|
}
|
||||||
@@ -215,10 +232,6 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void checkSchema() throws IOException, UnsupportedOperationException {
|
private void checkSchema() throws IOException, UnsupportedOperationException {
|
||||||
if (selectedTypes.length != requestedSchema.getFieldCount()) {
|
|
||||||
throw new RuntimeException("The quality of field type is incompatible with the request schema!");
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check that the requested schema is supported.
|
* Check that the requested schema is supported.
|
||||||
*/
|
*/
|
||||||
@@ -314,7 +327,7 @@ public class ParquetColumnarRowSplitReader implements Closeable {
|
|||||||
for (int i = 0; i < columns.size(); ++i) {
|
for (int i = 0; i < columns.size(); ++i) {
|
||||||
columnReaders[i] = createColumnReader(
|
columnReaders[i] = createColumnReader(
|
||||||
utcTimestamp,
|
utcTimestamp,
|
||||||
selectedTypes[i],
|
requestedTypes[i],
|
||||||
columns.get(i),
|
columns.get(i),
|
||||||
pages.getPageReader(columns.get(i)));
|
pages.getPageReader(columns.get(i)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,9 +108,7 @@ public class ParquetSplitReaderUtil {
|
|||||||
for (int i = 0; i < vectors.length; i++) {
|
for (int i = 0; i < vectors.length; i++) {
|
||||||
String name = fullFieldNames[selectedFields[i]];
|
String name = fullFieldNames[selectedFields[i]];
|
||||||
LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType();
|
LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType();
|
||||||
vectors[i] = partitionSpec.containsKey(name)
|
vectors[i] = createVector(readVectors, selNonPartNames, name, type, partitionSpec, batchSize);
|
||||||
? createVectorFromConstant(type, partitionSpec.get(name), batchSize)
|
|
||||||
: readVectors[selNonPartNames.indexOf(name)];
|
|
||||||
}
|
}
|
||||||
return new VectorizedColumnBatch(vectors);
|
return new VectorizedColumnBatch(vectors);
|
||||||
};
|
};
|
||||||
@@ -130,6 +128,24 @@ public class ParquetSplitReaderUtil {
|
|||||||
splitLength);
|
splitLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ColumnVector createVector(
|
||||||
|
ColumnVector[] readVectors,
|
||||||
|
List<String> selNonPartNames,
|
||||||
|
String name,
|
||||||
|
LogicalType type,
|
||||||
|
Map<String, Object> partitionSpec,
|
||||||
|
int batchSize) {
|
||||||
|
if (partitionSpec.containsKey(name)) {
|
||||||
|
return createVectorFromConstant(type, partitionSpec.get(name), batchSize);
|
||||||
|
}
|
||||||
|
ColumnVector readVector = readVectors[selNonPartNames.indexOf(name)];
|
||||||
|
if (readVector == null) {
|
||||||
|
// when the read vector is null, use a constant null vector instead
|
||||||
|
readVector = createVectorFromConstant(type, null, batchSize);
|
||||||
|
}
|
||||||
|
return readVector;
|
||||||
|
}
|
||||||
|
|
||||||
private static ColumnVector createVectorFromConstant(
|
private static ColumnVector createVectorFromConstant(
|
||||||
LogicalType type,
|
LogicalType type,
|
||||||
Object value,
|
Object value,
|
||||||
|
|||||||
@@ -960,6 +960,44 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
assertRowsEquals(result, TestData.dataSetInsert(5, 6));
|
assertRowsEquals(result, TestData.dataSetInsert(5, 6));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = HoodieTableType.class)
|
||||||
|
void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
|
||||||
|
TableEnvironment tableEnv = batchTableEnv;
|
||||||
|
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||||
|
conf.setString(FlinkOptions.TABLE_NAME, "t1");
|
||||||
|
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
|
||||||
|
|
||||||
|
// write a batch of data set
|
||||||
|
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||||
|
|
||||||
|
String hoodieTableDDL = sql("t1")
|
||||||
|
.field("uuid varchar(20)")
|
||||||
|
.field("name varchar(10)")
|
||||||
|
.field("age int")
|
||||||
|
.field("salary double")
|
||||||
|
.field("ts timestamp(3)")
|
||||||
|
.field("`partition` varchar(10)")
|
||||||
|
.pkField("uuid")
|
||||||
|
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||||
|
.option(FlinkOptions.TABLE_TYPE, tableType)
|
||||||
|
.end();
|
||||||
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
|
|
||||||
|
List<Row> result = CollectionUtil.iterableToList(
|
||||||
|
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||||
|
final String expected = "["
|
||||||
|
+ "+I[id1, Danny, 23, null, 1970-01-01T00:00:00.001, par1], "
|
||||||
|
+ "+I[id2, Stephen, 33, null, 1970-01-01T00:00:00.002, par1], "
|
||||||
|
+ "+I[id3, Julian, 53, null, 1970-01-01T00:00:00.003, par2], "
|
||||||
|
+ "+I[id4, Fabian, 31, null, 1970-01-01T00:00:00.004, par2], "
|
||||||
|
+ "+I[id5, Sophia, 18, null, 1970-01-01T00:00:00.005, par3], "
|
||||||
|
+ "+I[id6, Emma, 20, null, 1970-01-01T00:00:00.006, par3], "
|
||||||
|
+ "+I[id7, Bob, 44, null, 1970-01-01T00:00:00.007, par4], "
|
||||||
|
+ "+I[id8, Han, 56, null, 1970-01-01T00:00:00.008, par4]]";
|
||||||
|
assertRowsEquals(result, expected);
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions;
|
|||||||
import org.apache.hudi.table.HoodieTableSource;
|
import org.apache.hudi.table.HoodieTableSource;
|
||||||
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
|
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
|
||||||
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
|
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
|
||||||
|
import org.apache.hudi.util.AvroSchemaConverter;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestConfigurations;
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
import org.apache.hudi.utils.TestData;
|
import org.apache.hudi.utils.TestData;
|
||||||
@@ -445,6 +446,20 @@ public class TestInputFormat {
|
|||||||
TestData.assertRowDataEquals(actual4, expected4);
|
TestData.assertRowDataEquals(actual4, expected4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = HoodieTableType.class)
|
||||||
|
void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
|
||||||
|
Map<String, String> options = new HashMap<>();
|
||||||
|
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
|
||||||
|
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString());
|
||||||
|
beforeEach(tableType, options);
|
||||||
|
|
||||||
|
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||||
|
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
|
||||||
|
List<RowData> result = readData(inputFormat);
|
||||||
|
TestData.assertRowDataEquals(result, TestData.DATA_SET_INSERT);
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -63,6 +63,17 @@ public class TestConfigurations {
|
|||||||
private static final List<String> FIELDS = ROW_TYPE.getFields().stream()
|
private static final List<String> FIELDS = ROW_TYPE.getFields().stream()
|
||||||
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());
|
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());
|
||||||
|
|
||||||
|
public static final DataType ROW_DATA_TYPE_WIDER = DataTypes.ROW(
|
||||||
|
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
|
||||||
|
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
|
||||||
|
DataTypes.FIELD("age", DataTypes.INT()),
|
||||||
|
DataTypes.FIELD("salary", DataTypes.DOUBLE()),
|
||||||
|
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
|
||||||
|
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
|
||||||
|
.notNull();
|
||||||
|
|
||||||
|
public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType();
|
||||||
|
|
||||||
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
|
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
|
||||||
return getCreateHoodieTableDDL(tableName, options, true, "partition");
|
return getCreateHoodieTableDDL(tableName, options, true, "partition");
|
||||||
}
|
}
|
||||||
@@ -92,8 +103,9 @@ public class TestConfigurations {
|
|||||||
if (havePartition) {
|
if (havePartition) {
|
||||||
builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n");
|
builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n");
|
||||||
}
|
}
|
||||||
|
final String connector = options.computeIfAbsent("connector", k -> "hudi");
|
||||||
builder.append("with (\n"
|
builder.append("with (\n"
|
||||||
+ " 'connector' = 'hudi'");
|
+ " 'connector' = '").append(connector).append("'");
|
||||||
options.forEach((k, v) -> builder.append(",\n")
|
options.forEach((k, v) -> builder.append(",\n")
|
||||||
.append(" '").append(k).append("' = '").append(v).append("'"));
|
.append(" '").append(k).append("' = '").append(v).append("'"));
|
||||||
builder.append("\n)");
|
builder.append("\n)");
|
||||||
@@ -235,6 +247,11 @@ public class TestConfigurations {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Sql options(Map<String, String> options) {
|
||||||
|
this.options.putAll(options);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Sql noPartition() {
|
public Sql noPartition() {
|
||||||
this.withPartition = false;
|
this.withPartition = false;
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
Reference in New Issue
Block a user