diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 94fbe028b..2c3318362 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -110,10 +110,14 @@ public class FormatUtils { List requiredFields = requiredSchema.getFields(); assert (requiredFields.size() == requiredPos.length); Iterator positionIterator = Arrays.stream(requiredPos).iterator(); - requiredFields.forEach(f -> recordBuilder.set(f, record.get(positionIterator.next()))); + requiredFields.forEach(f -> recordBuilder.set(f, getVal(record, positionIterator.next()))); return recordBuilder.build(); } + private static Object getVal(IndexedRecord record, int pos) { + return pos == -1 ? null : record.get(pos); + } + public static HoodieMergedLogRecordScanner logScanner( MergeOnReadInputSplit split, Schema logSchema, diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java index 415469695..64eb1f485 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.stream.IntStream; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader; import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector; @@ -67,6 +68,8 @@ public class ParquetColumnarRowSplitReader implements Closeable { private final MessageType fileSchema; + private final LogicalType[] requestedTypes; + private final MessageType requestedSchema; /** @@ -81,8 +84,6 @@ public class ParquetColumnarRowSplitReader implements Closeable { private final ColumnarRowData row; - private final LogicalType[] selectedTypes; - private final int batchSize; private ParquetFileReader reader; @@ -121,7 +122,6 @@ public class ParquetColumnarRowSplitReader implements Closeable { long splitStart, long splitLength) throws IOException { this.utcTimestamp = utcTimestamp; - this.selectedTypes = selectedTypes; this.batchSize = batchSize; // then we need to apply the predicate push down filter ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength)); @@ -130,7 +130,13 @@ public class ParquetColumnarRowSplitReader implements Closeable { List blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); this.fileSchema = footer.getFileMetaData().getSchema(); - this.requestedSchema = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive); + + Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive); + int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray(); + Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new); + + this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new); + this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet"); this.reader = new ParquetFileReader( conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns()); @@ -146,23 +152,37 @@ public class ParquetColumnarRowSplitReader implements Closeable { checkSchema(); this.writableVectors = createWritableVectors(); - this.columnarBatch = generator.generate(createReadableVectors()); + ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices); + this.columnarBatch = generator.generate(columnVectors); this.row = new ColumnarRowData(columnarBatch); } + /** + * Patches the given vectors with nulls. + * The vector position that is not requested (or read from file) is patched as null. + * + * @param fields The total selected fields number + * @param vectors The readable vectors + * @param indices The requested indices from the selected fields + */ + private static ColumnVector[] patchedVector(int fields, ColumnVector[] vectors, int[] indices) { + ColumnVector[] patched = new ColumnVector[fields]; + for (int i = 0; i < indices.length; i++) { + patched[indices[i]] = vectors[i]; + } + return patched; + } + /** * Clips `parquetSchema` according to `fieldNames`. */ - private static MessageType clipParquetSchema( + private static Type[] clipParquetSchema( GroupType parquetSchema, String[] fieldNames, boolean caseSensitive) { Type[] types = new Type[fieldNames.length]; if (caseSensitive) { for (int i = 0; i < fieldNames.length; ++i) { String fieldName = fieldNames[i]; - if (parquetSchema.getFieldIndex(fieldName) < 0) { - throw new IllegalArgumentException(fieldName + " does not exist"); - } - types[i] = parquetSchema.getType(fieldName); + types[i] = parquetSchema.containsField(fieldName) ? parquetSchema.getType(fieldName) : null; } } else { Map caseInsensitiveFieldMap = new HashMap<>(); @@ -178,23 +198,20 @@ public class ParquetColumnarRowSplitReader implements Closeable { } for (int i = 0; i < fieldNames.length; ++i) { Type type = caseInsensitiveFieldMap.get(fieldNames[i].toLowerCase(Locale.ROOT)); - if (type == null) { - throw new IllegalArgumentException(fieldNames[i] + " does not exist"); - } // TODO clip for array,map,row types. types[i] = type; } } - return Types.buildMessage().addFields(types).named("flink-parquet"); + return types; } private WritableColumnVector[] createWritableVectors() { - WritableColumnVector[] columns = new WritableColumnVector[selectedTypes.length]; - for (int i = 0; i < selectedTypes.length; i++) { + WritableColumnVector[] columns = new WritableColumnVector[requestedTypes.length]; + for (int i = 0; i < requestedTypes.length; i++) { columns[i] = createWritableColumnVector( batchSize, - selectedTypes[i], + requestedTypes[i], requestedSchema.getColumns().get(i).getPrimitiveType()); } return columns; @@ -207,7 +224,7 @@ public class ParquetColumnarRowSplitReader implements Closeable { private ColumnVector[] createReadableVectors() { ColumnVector[] vectors = new ColumnVector[writableVectors.length]; for (int i = 0; i < writableVectors.length; i++) { - vectors[i] = selectedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL + vectors[i] = requestedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL ? new ParquetDecimalVector(writableVectors[i]) : writableVectors[i]; } @@ -215,10 +232,6 @@ public class ParquetColumnarRowSplitReader implements Closeable { } private void checkSchema() throws IOException, UnsupportedOperationException { - if (selectedTypes.length != requestedSchema.getFieldCount()) { - throw new RuntimeException("The quality of field type is incompatible with the request schema!"); - } - /* * Check that the requested schema is supported. */ @@ -314,7 +327,7 @@ public class ParquetColumnarRowSplitReader implements Closeable { for (int i = 0; i < columns.size(); ++i) { columnReaders[i] = createColumnReader( utcTimestamp, - selectedTypes[i], + requestedTypes[i], columns.get(i), pages.getPageReader(columns.get(i))); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index 63b679d44..6bb514b42 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -108,9 +108,7 @@ public class ParquetSplitReaderUtil { for (int i = 0; i < vectors.length; i++) { String name = fullFieldNames[selectedFields[i]]; LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType(); - vectors[i] = partitionSpec.containsKey(name) - ? createVectorFromConstant(type, partitionSpec.get(name), batchSize) - : readVectors[selNonPartNames.indexOf(name)]; + vectors[i] = createVector(readVectors, selNonPartNames, name, type, partitionSpec, batchSize); } return new VectorizedColumnBatch(vectors); }; @@ -130,6 +128,24 @@ public class ParquetSplitReaderUtil { splitLength); } + private static ColumnVector createVector( + ColumnVector[] readVectors, + List selNonPartNames, + String name, + LogicalType type, + Map partitionSpec, + int batchSize) { + if (partitionSpec.containsKey(name)) { + return createVectorFromConstant(type, partitionSpec.get(name), batchSize); + } + ColumnVector readVector = readVectors[selNonPartNames.indexOf(name)]; + if (readVector == null) { + // when the read vector is null, use a constant null vector instead + readVector = createVectorFromConstant(type, null, batchSize); + } + return readVector; + } + private static ColumnVector createVectorFromConstant( LogicalType type, Object value, diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 4e7b3bb3d..2d500ad39 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -960,6 +960,44 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, TestData.dataSetInsert(5, 6)); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testReadWithWiderSchema(HoodieTableType tableType) throws Exception { + TableEnvironment tableEnv = batchTableEnv; + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); + + // write a batch of data set + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + String hoodieTableDDL = sql("t1") + .field("uuid varchar(20)") + .field("name varchar(10)") + .field("age int") + .field("salary double") + .field("ts timestamp(3)") + .field("`partition` varchar(10)") + .pkField("uuid") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + final String expected = "[" + + "+I[id1, Danny, 23, null, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 33, null, 1970-01-01T00:00:00.002, par1], " + + "+I[id3, Julian, 53, null, 1970-01-01T00:00:00.003, par2], " + + "+I[id4, Fabian, 31, null, 1970-01-01T00:00:00.004, par2], " + + "+I[id5, Sophia, 18, null, 1970-01-01T00:00:00.005, par3], " + + "+I[id6, Emma, 20, null, 1970-01-01T00:00:00.006, par3], " + + "+I[id7, Bob, 44, null, 1970-01-01T00:00:00.007, par4], " + + "+I[id8, Han, 56, null, 1970-01-01T00:00:00.008, par4]]"; + assertRowsEquals(result, expected); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index d4692059c..f4da947f3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -25,6 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieTableSource; import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -445,6 +446,20 @@ public class TestInputFormat { TestData.assertRowDataEquals(actual4, expected4); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testReadWithWiderSchema(HoodieTableType tableType) throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), + AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString()); + beforeEach(tableType, options); + + TestData.writeData(TestData.DATA_SET_INSERT, conf); + InputFormat inputFormat = this.tableSource.getInputFormat(); + List result = readData(inputFormat); + TestData.assertRowDataEquals(result, TestData.DATA_SET_INSERT); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index f9824426a..0eafb1281 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -63,6 +63,17 @@ public class TestConfigurations { private static final List FIELDS = ROW_TYPE.getFields().stream() .map(RowType.RowField::asSummaryString).collect(Collectors.toList()); + public static final DataType ROW_DATA_TYPE_WIDER = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("salary", DataTypes.DOUBLE()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + + public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType(); + public static String getCreateHoodieTableDDL(String tableName, Map options) { return getCreateHoodieTableDDL(tableName, options, true, "partition"); } @@ -92,8 +103,9 @@ public class TestConfigurations { if (havePartition) { builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n"); } + final String connector = options.computeIfAbsent("connector", k -> "hudi"); builder.append("with (\n" - + " 'connector' = 'hudi'"); + + " 'connector' = '").append(connector).append("'"); options.forEach((k, v) -> builder.append(",\n") .append(" '").append(k).append("' = '").append(v).append("'")); builder.append("\n)"); @@ -235,6 +247,11 @@ public class TestConfigurations { return this; } + public Sql options(Map options) { + this.options.putAll(options); + return this; + } + public Sql noPartition() { this.withPartition = false; return this;