diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index 1c8b988b0..3d9524eaa 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -18,19 +18,22 @@
package org.apache.hudi.io.storage.row.parquet;
+import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.Type;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -46,7 +49,8 @@ import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnRead
/**
* Writes a record to the Parquet API with the expected schema in order to be written to a file.
*
- *
Reference org.apache.flink.formats.parquet.row.ParquetRowDataWriter to support timestamp of INT64 8 bytes.
+ *
Reference {@code org.apache.flink.formats.parquet.row.ParquetRowDataWriter}
+ * to support timestamp of INT64 8 bytes and complex data types.
*/
public class ParquetRowDataWriter {
@@ -67,7 +71,7 @@ public class ParquetRowDataWriter {
this.filedWriters = new FieldWriter[rowType.getFieldCount()];
this.fieldNames = rowType.getFieldNames().toArray(new String[0]);
for (int i = 0; i < rowType.getFieldCount(); i++) {
- this.filedWriters[i] = createWriter(rowType.getTypeAt(i), schema.getType(i));
+ this.filedWriters[i] = createWriter(rowType.getTypeAt(i));
}
}
@@ -91,59 +95,75 @@ public class ParquetRowDataWriter {
recordConsumer.endMessage();
}
- private FieldWriter createWriter(LogicalType t, Type type) {
- if (type.isPrimitive()) {
- switch (t.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- return new StringWriter();
- case BOOLEAN:
- return new BooleanWriter();
- case BINARY:
- case VARBINARY:
- return new BinaryWriter();
- case DECIMAL:
- DecimalType decimalType = (DecimalType) t;
- return createDecimalWriter(decimalType.getPrecision(), decimalType.getScale());
- case TINYINT:
- return new ByteWriter();
- case SMALLINT:
- return new ShortWriter();
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- case INTEGER:
- return new IntWriter();
- case BIGINT:
- return new LongWriter();
- case FLOAT:
- return new FloatWriter();
- case DOUBLE:
- return new DoubleWriter();
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- TimestampType timestampType = (TimestampType) t;
- if (timestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
- } else {
- return new Timestamp96Writer(timestampType.getPrecision());
- }
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) t;
- if (localZonedTimestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
- } else {
- return new Timestamp96Writer(localZonedTimestampType.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Unsupported type: " + type);
- }
- } else {
- throw new IllegalArgumentException("Unsupported data type: " + t);
+ private FieldWriter createWriter(LogicalType t) {
+ switch (t.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return new StringWriter();
+ case BOOLEAN:
+ return new BooleanWriter();
+ case BINARY:
+ case VARBINARY:
+ return new BinaryWriter();
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) t;
+ return createDecimalWriter(decimalType.getPrecision(), decimalType.getScale());
+ case TINYINT:
+ return new ByteWriter();
+ case SMALLINT:
+ return new ShortWriter();
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTEGER:
+ return new IntWriter();
+ case BIGINT:
+ return new LongWriter();
+ case FLOAT:
+ return new FloatWriter();
+ case DOUBLE:
+ return new DoubleWriter();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ TimestampType timestampType = (TimestampType) t;
+ if (timestampType.getPrecision() == 3) {
+ return new Timestamp64Writer();
+ } else {
+ return new Timestamp96Writer(timestampType.getPrecision());
+ }
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) t;
+ if (localZonedTimestampType.getPrecision() == 3) {
+ return new Timestamp64Writer();
+ } else {
+ return new Timestamp96Writer(localZonedTimestampType.getPrecision());
+ }
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) t;
+ LogicalType elementType = arrayType.getElementType();
+ FieldWriter elementWriter = createWriter(elementType);
+ return new ArrayWriter(elementWriter);
+ case MAP:
+ MapType mapType = (MapType) t;
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+ FieldWriter keyWriter = createWriter(keyType);
+ FieldWriter valueWriter = createWriter(valueType);
+ return new MapWriter(keyWriter, valueWriter);
+ case ROW:
+ RowType rowType = (RowType) t;
+ FieldWriter[] fieldWriters = rowType.getFields().stream()
+ .map(RowType.RowField::getType).map(this::createWriter).toArray(FieldWriter[]::new);
+ String[] fieldNames = rowType.getFields().stream()
+ .map(RowType.RowField::getName).toArray(String[]::new);
+ return new RowWriter(fieldNames, fieldWriters);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + t);
}
}
private interface FieldWriter {
-
void write(RowData row, int ordinal);
+
+ void write(ArrayData array, int ordinal);
}
private class BooleanWriter implements FieldWriter {
@@ -152,6 +172,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBoolean(row.getBoolean(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addBoolean(array.getBoolean(ordinal));
+ }
}
private class ByteWriter implements FieldWriter {
@@ -160,6 +185,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getByte(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addInteger(array.getByte(ordinal));
+ }
}
private class ShortWriter implements FieldWriter {
@@ -168,6 +198,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getShort(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addInteger(array.getShort(ordinal));
+ }
}
private class LongWriter implements FieldWriter {
@@ -176,6 +211,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addLong(row.getLong(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addLong(array.getLong(ordinal));
+ }
}
private class FloatWriter implements FieldWriter {
@@ -184,6 +224,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addFloat(row.getFloat(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addFloat(array.getFloat(ordinal));
+ }
}
private class DoubleWriter implements FieldWriter {
@@ -192,6 +237,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addDouble(row.getDouble(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addDouble(array.getDouble(ordinal));
+ }
}
private class StringWriter implements FieldWriter {
@@ -200,6 +250,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getString(ordinal).toBytes()));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addBinary(Binary.fromReusedByteArray(array.getString(ordinal).toBytes()));
+ }
}
private class BinaryWriter implements FieldWriter {
@@ -208,6 +263,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addBinary(Binary.fromReusedByteArray(array.getBinary(ordinal)));
+ }
}
private class IntWriter implements FieldWriter {
@@ -216,6 +276,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addInteger(row.getInt(ordinal));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addInteger(array.getInt(ordinal));
+ }
}
/**
@@ -231,6 +296,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
+ }
}
private long timestampToInt64(TimestampData timestampData) {
@@ -254,6 +324,11 @@ public class ParquetRowDataWriter {
public void write(RowData row, int ordinal) {
recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
}
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ recordConsumer.addBinary(timestampToInt96(array.getTimestamp(ordinal, precision)));
+ }
}
private Binary timestampToInt96(TimestampData timestampData) {
@@ -304,10 +379,20 @@ public class ParquetRowDataWriter {
@Override
public void write(RowData row, int ordinal) {
long unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong();
+ doWrite(unscaledLong);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ long unscaledLong = array.getDecimal(ordinal, precision, scale).toUnscaledLong();
+ doWrite(unscaledLong);
+ }
+
+ private void doWrite(long unscaled) {
int i = 0;
int shift = initShift;
while (i < numBytes) {
- decimalBuffer[i] = (byte) (unscaledLong >> shift);
+ decimalBuffer[i] = (byte) (unscaled >> shift);
i += 1;
shift -= 8;
}
@@ -328,6 +413,16 @@ public class ParquetRowDataWriter {
@Override
public void write(RowData row, int ordinal) {
byte[] bytes = row.getDecimal(ordinal, precision, scale).toUnscaledBytes();
+ doWrite(bytes);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ byte[] bytes = array.getDecimal(ordinal, precision, scale).toUnscaledBytes();
+ doWrite(bytes);
+ }
+
+ private void doWrite(byte[] bytes) {
byte[] writtenBytes;
if (bytes.length == numBytes) {
// Avoid copy.
@@ -353,5 +448,132 @@ public class ParquetRowDataWriter {
// 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
return new UnscaledBytesWriter();
}
+
+ private class ArrayWriter implements FieldWriter {
+ private final FieldWriter elementWriter;
+
+ private ArrayWriter(FieldWriter elementWriter) {
+ this.elementWriter = elementWriter;
+ }
+
+ @Override
+ public void write(RowData row, int ordinal) {
+ ArrayData arrayData = row.getArray(ordinal);
+ doWrite(arrayData);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ ArrayData arrayData = array.getArray(ordinal);
+ doWrite(arrayData);
+ }
+
+ private void doWrite(ArrayData arrayData) {
+ recordConsumer.startGroup();
+ if (arrayData.size() > 0) {
+ final String repeatedGroup = "list";
+ final String elementField = "element";
+ recordConsumer.startField(repeatedGroup, 0);
+ for (int i = 0; i < arrayData.size(); i++) {
+ recordConsumer.startGroup();
+ if (!arrayData.isNullAt(i)) {
+ // Only creates the element field if the current array element is not null.
+ recordConsumer.startField(elementField, 0);
+ elementWriter.write(arrayData, i);
+ recordConsumer.endField(elementField, 0);
+ }
+ recordConsumer.endGroup();
+ }
+ recordConsumer.endField(repeatedGroup, 0);
+ }
+ recordConsumer.endGroup();
+ }
+ }
+
+ private class MapWriter implements FieldWriter {
+ private final FieldWriter keyWriter;
+ private final FieldWriter valueWriter;
+
+ private MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {
+ this.keyWriter = keyWriter;
+ this.valueWriter = valueWriter;
+ }
+
+ @Override
+ public void write(RowData row, int ordinal) {
+ MapData map = row.getMap(ordinal);
+ doWrite(map);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ MapData map = array.getMap(ordinal);
+ doWrite(map);
+ }
+
+ private void doWrite(MapData mapData) {
+ ArrayData keyArray = mapData.keyArray();
+ ArrayData valueArray = mapData.valueArray();
+ recordConsumer.startGroup();
+ if (mapData.size() > 0) {
+ final String repeatedGroup = "key_value";
+ final String kField = "key";
+ final String vField = "value";
+ recordConsumer.startField(repeatedGroup, 0);
+ for (int i = 0; i < mapData.size(); i++) {
+ recordConsumer.startGroup();
+ // key
+ recordConsumer.startField(kField, 0);
+ this.keyWriter.write(keyArray, i);
+ recordConsumer.endField(kField, 0);
+ // value
+ if (!valueArray.isNullAt(i)) {
+ // Only creates the "value" field if the value if non-empty
+ recordConsumer.startField(vField, 1);
+ this.valueWriter.write(valueArray, i);
+ recordConsumer.endField(vField, 1);
+ }
+ recordConsumer.endGroup();
+ }
+ recordConsumer.endField(repeatedGroup, 0);
+ }
+ recordConsumer.endGroup();
+ }
+ }
+
+ private class RowWriter implements FieldWriter {
+ private final String[] fieldNames;
+ private final FieldWriter[] fieldWriters;
+
+ private RowWriter(String[] fieldNames, FieldWriter[] fieldWriters) {
+ this.fieldNames = fieldNames;
+ this.fieldWriters = fieldWriters;
+ }
+
+ @Override
+ public void write(RowData row, int ordinal) {
+ RowData nested = row.getRow(ordinal, fieldWriters.length);
+ doWrite(nested);
+ }
+
+ @Override
+ public void write(ArrayData array, int ordinal) {
+ RowData nested = array.getRow(ordinal, fieldWriters.length);
+ doWrite(nested);
+ }
+
+ private void doWrite(RowData row) {
+ recordConsumer.startGroup();
+ for (int i = 0; i < row.getArity(); i++) {
+ if (!row.isNullAt(i)) {
+ String fieldName = fieldNames[i];
+ recordConsumer.startField(fieldName, i);
+ fieldWriters[i].write(row, i);
+ recordConsumer.endField(fieldName, i);
+ }
+ }
+ recordConsumer.endGroup();
+ }
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 80fda29aa..5da45bf25 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -25,9 +25,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
@@ -616,6 +618,45 @@ public class ParquetSchemaConverter {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
.named(name);
}
+ case ARRAY:
+ // group (LIST) {
+ // repeated group list {
+ // element;
+ // }
+ // }
+ ArrayType arrayType = (ArrayType) type;
+ LogicalType elementType = arrayType.getElementType();
+ return Types
+ .buildGroup(repetition).as(OriginalType.LIST)
+ .addField(
+ Types.repeatedGroup()
+ .addField(convertToParquetType("element", elementType, repetition))
+ .named("list"))
+ .named(name);
+ case MAP:
+ // group (MAP) {
+ // repeated group key_value {
+ // required key;
+ // value;
+ // }
+ // }
+ MapType mapType = (MapType) type;
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+ return Types
+ .buildGroup(repetition).as(OriginalType.MAP)
+ .addField(
+ Types
+ .repeatedGroup()
+ .addField(convertToParquetType("key", keyType, repetition))
+ .addField(convertToParquetType("value", valueType, repetition))
+ .named("key_value"))
+ .named(name);
+ case ROW:
+ RowType rowType = (RowType) type;
+ Types.GroupBuilder builder = Types.buildGroup(repetition);
+ rowType.getFields().forEach(field -> builder.addField(convertToParquetType(field.getName(), field.getType(), repetition)));
+ return builder.named(name);
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
new file mode 100644
index 000000000..5305bcc8a
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row.parquet;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link ParquetSchemaConverter}.
+ */
+public class TestParquetSchemaConverter {
+ @Test
+ void testConvertComplexTypes() {
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("f_array",
+ DataTypes.ARRAY(DataTypes.CHAR(10))),
+ DataTypes.FIELD("f_map",
+ DataTypes.MAP(DataTypes.INT(), DataTypes.VARCHAR(20))),
+ DataTypes.FIELD("f_row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f_row_f0", DataTypes.INT()),
+ DataTypes.FIELD("f_row_f1", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("f_row_f2",
+ DataTypes.ROW(
+ DataTypes.FIELD("f_row_f2_f0", DataTypes.INT()),
+ DataTypes.FIELD("f_row_f2_f1", DataTypes.VARCHAR(10)))))));
+ org.apache.parquet.schema.MessageType messageType =
+ ParquetSchemaConverter.convertToParquetMessageType("converted", (RowType) dataType.getLogicalType());
+ assertThat(messageType.getColumns().size(), is(7));
+ final String expected = "message converted {\n"
+ + " optional group f_array (LIST) {\n"
+ + " repeated group list {\n"
+ + " optional binary element (UTF8);\n"
+ + " }\n"
+ + " }\n"
+ + " optional group f_map (MAP) {\n"
+ + " repeated group key_value {\n"
+ + " optional int32 key;\n"
+ + " optional binary value (UTF8);\n"
+ + " }\n"
+ + " }\n"
+ + " optional group f_row {\n"
+ + " optional int32 f_row_f0;\n"
+ + " optional binary f_row_f1 (UTF8);\n"
+ + " optional group f_row_f2 {\n"
+ + " optional int32 f_row_f2_f0;\n"
+ + " optional binary f_row_f2_f1 (UTF8);\n"
+ + " }\n"
+ + " }\n"
+ + "}\n";
+ assertThat(messageType.toString(), is(expected));
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 090ed29b8..a72b885a2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -30,7 +30,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
/**
@@ -43,6 +46,7 @@ import java.util.List;
* @see StreamWriteOperatorCoordinator
*/
public class AppendWriteFunction extends AbstractStreamWriteFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunction.class);
private static final long serialVersionUID = 1L;
@@ -113,14 +117,19 @@ public class AppendWriteFunction extends AbstractStreamWriteFunction {
}
private void flushData(boolean endInput) {
- if (this.writerHelper == null) {
- // does not process any inputs, returns early.
- return;
+ final List writeStatus;
+ final String instant;
+ if (this.writerHelper != null) {
+ writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+ instant = this.writerHelper.getInstantTime();
+ } else {
+ LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
+ writeStatus = Collections.emptyList();
+ instant = instantToWrite(false);
}
- final List writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
- .instantTime(this.writerHelper.getInstantTime())
+ .instantTime(instant)
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
index 64eb1f485..c615283c7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetColumnarRowSplitReader.java
@@ -18,10 +18,11 @@
package org.apache.hudi.table.format.cow;
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.ColumnarRowData;
import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -208,11 +209,14 @@ public class ParquetColumnarRowSplitReader implements Closeable {
private WritableColumnVector[] createWritableVectors() {
WritableColumnVector[] columns = new WritableColumnVector[requestedTypes.length];
+ List types = requestedSchema.getFields();
+ List descriptors = requestedSchema.getColumns();
for (int i = 0; i < requestedTypes.length; i++) {
columns[i] = createWritableColumnVector(
batchSize,
requestedTypes[i],
- requestedSchema.getColumns().get(i).getPrimitiveType());
+ types.get(i),
+ descriptors);
}
return columns;
}
@@ -236,11 +240,6 @@ public class ParquetColumnarRowSplitReader implements Closeable {
* Check that the requested schema is supported.
*/
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
- Type t = requestedSchema.getFields().get(i);
- if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
- throw new UnsupportedOperationException("Complex types not supported.");
- }
-
String[] colPath = requestedSchema.getPaths().get(i);
if (fileSchema.containsPath(colPath)) {
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
@@ -322,14 +321,16 @@ public class ParquetColumnarRowSplitReader implements Closeable {
throw new IOException("expecting more rows but reached last block. Read "
+ rowsReturned + " out of " + totalRowCount);
}
+ List types = requestedSchema.getFields();
List columns = requestedSchema.getColumns();
- columnReaders = new ColumnReader[columns.size()];
- for (int i = 0; i < columns.size(); ++i) {
+ columnReaders = new ColumnReader[types.size()];
+ for (int i = 0; i < types.size(); ++i) {
columnReaders[i] = createColumnReader(
utcTimestamp,
requestedTypes[i],
- columns.get(i),
- pages.getPageReader(columns.get(i)));
+ types.get(i),
+ columns,
+ pages);
}
totalCountLoadedSoFar += pages.getRowCount();
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
index 2749f02f3..4705b2f63 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java
@@ -32,9 +32,9 @@ import org.apache.flink.table.data.vector.DecimalColumnVector;
*/
public class ParquetDecimalVector implements DecimalColumnVector {
- private final ColumnVector vector;
+ public final ColumnVector vector;
- ParquetDecimalVector(ColumnVector vector) {
+ public ParquetDecimalVector(ColumnVector vector) {
this.vector = vector;
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 29c1b205b..10a2dcd5a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -18,6 +18,15 @@
package org.apache.hudi.table.format.cow;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
+
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
@@ -32,7 +41,6 @@ import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
import org.apache.flink.table.data.vector.heap.HeapByteVector;
import org.apache.flink.table.data.vector.heap.HeapBytesVector;
@@ -44,16 +52,24 @@ import org.apache.flink.table.data.vector.heap.HeapShortVector;
import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.math.BigDecimal;
@@ -61,6 +77,7 @@ import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -252,11 +269,40 @@ public class ParquetSplitReaderUtil {
}
}
+ private static List filterDescriptors(int depth, Type type, List columns) throws ParquetRuntimeException {
+ List filtered = new ArrayList<>();
+ for (ColumnDescriptor descriptor : columns) {
+ if (depth >= descriptor.getPath().length) {
+ throw new InvalidSchemaException("Expect depth " + depth + " for schema: " + descriptor);
+ }
+ if (type.getName().equals(descriptor.getPath()[depth])) {
+ filtered.add(descriptor);
+ }
+ }
+ ValidationUtils.checkState(filtered.size() > 0, "Corrupted Parquet schema");
+ return filtered;
+ }
+
public static ColumnReader createColumnReader(
boolean utcTimestamp,
LogicalType fieldType,
- ColumnDescriptor descriptor,
- PageReader pageReader) throws IOException {
+ Type physicalType,
+ List descriptors,
+ PageReadStore pages) throws IOException {
+ return createColumnReader(utcTimestamp, fieldType, physicalType, descriptors,
+ pages, 0);
+ }
+
+ private static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ Type physicalType,
+ List columns,
+ PageReadStore pages,
+ int depth) throws IOException {
+ List descriptors = filterDescriptors(depth, physicalType, columns);
+ ColumnDescriptor descriptor = descriptors.get(0);
+ PageReader pageReader = pages.getPageReader(descriptor);
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
return new BooleanColumnReader(descriptor, pageReader);
@@ -303,6 +349,45 @@ public class ParquetSplitReaderUtil {
default:
throw new AssertionError();
}
+ case ARRAY:
+ return new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ fieldType);
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ ArrayColumnReader keyReader =
+ new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ new ArrayType(mapType.getKeyType()));
+ ArrayColumnReader valueReader =
+ new ArrayColumnReader(
+ descriptors.get(1),
+ pages.getPageReader(descriptors.get(1)),
+ utcTimestamp,
+ descriptors.get(1).getPrimitiveType(),
+ new ArrayType(mapType.getValueType()));
+ return new MapColumnReader(keyReader, valueReader, fieldType);
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ List fieldReaders = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ fieldReaders.add(
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(i),
+ descriptors,
+ pages,
+ depth + 1));
+ }
+ return new RowColumnReader(fieldReaders);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
@@ -311,7 +396,19 @@ public class ParquetSplitReaderUtil {
public static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType fieldType,
- PrimitiveType primitiveType) {
+ Type physicalType,
+ List descriptors) {
+ return createWritableColumnVector(batchSize, fieldType, physicalType, descriptors, 0);
+ }
+
+ private static WritableColumnVector createWritableColumnVector(
+ int batchSize,
+ LogicalType fieldType,
+ Type physicalType,
+ List columns,
+ int depth) {
+ List descriptors = filterDescriptors(depth, physicalType, columns);
+ PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
@@ -371,6 +468,49 @@ public class ParquetSplitReaderUtil {
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) fieldType;
+ return new HeapArrayVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ arrayType.getElementType(),
+ physicalType,
+ descriptors,
+ depth));
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
+ // the map column has three level paths.
+ return new HeapMapColumnVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ mapType.getKeyType(),
+ repeatedType.getType(0),
+ descriptors,
+ depth + 2),
+ createWritableColumnVector(
+ batchSize,
+ mapType.getValueType(),
+ repeatedType.getType(1),
+ descriptors,
+ depth + 2));
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ WritableColumnVector[] columnVectors =
+ new WritableColumnVector[rowType.getFieldCount()];
+ for (int i = 0; i < columnVectors.length; i++) {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(i),
+ descriptors,
+ depth + 1);
+ }
+ return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
new file mode 100644
index 000000000..a16a4dd8d
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarArrayData.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.hudi.table.format.cow.vector.MapColumnVector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.BooleanColumnVector;
+import org.apache.flink.table.data.vector.ByteColumnVector;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.DoubleColumnVector;
+import org.apache.flink.table.data.vector.FloatColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
+import org.apache.flink.table.data.vector.RowColumnVector;
+import org.apache.flink.table.data.vector.ShortColumnVector;
+import org.apache.flink.table.data.vector.TimestampColumnVector;
+
+import java.util.Arrays;
+
+/**
+ * Columnar array to support access to vector column data.
+ *
+ *
References {@code org.apache.flink.table.data.ColumnarArrayData} to include FLINK-15390.
+ */
+public final class ColumnarArrayData implements ArrayData, TypedSetters {
+
+ private final ColumnVector data;
+ private final int offset;
+ private final int numElements;
+
+ public ColumnarArrayData(ColumnVector data, int offset, int numElements) {
+ this.data = data;
+ this.offset = offset;
+ this.numElements = numElements;
+ }
+
+ @Override
+ public int size() {
+ return numElements;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return data.isNullAt(offset + pos);
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return ((BooleanColumnVector) data).getBoolean(offset + pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return ((ByteColumnVector) data).getByte(offset + pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return ((ShortColumnVector) data).getShort(offset + pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return ((IntColumnVector) data).getInt(offset + pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return ((LongColumnVector) data).getLong(offset + pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return ((FloatColumnVector) data).getFloat(offset + pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return ((DoubleColumnVector) data).getDouble(offset + pos);
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ BytesColumnVector.Bytes byteArray = getByteArray(pos);
+ return StringData.fromBytes(byteArray.data, byteArray.offset, byteArray.len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ return ((DecimalColumnVector) data).getDecimal(offset + pos, precision, scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ return ((TimestampColumnVector) data).getTimestamp(offset + pos, precision);
+ }
+
+ @Override
+ public RawValueData getRawValue(int pos) {
+ throw new UnsupportedOperationException("RawValueData is not supported.");
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ BytesColumnVector.Bytes byteArray = getByteArray(pos);
+ if (byteArray.len == byteArray.data.length) {
+ return byteArray.data;
+ } else {
+ return Arrays.copyOfRange(byteArray.data, byteArray.offset, byteArray.len);
+ }
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ return ((ArrayColumnVector) data).getArray(offset + pos);
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ return ((MapColumnVector) data).getMap(offset + pos);
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ return ((RowColumnVector) data).getRow(offset + pos);
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDecimal(int pos, DecimalData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setTimestamp(int pos, TimestampData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] res = new boolean[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getBoolean(i);
+ }
+ return res;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] res = new byte[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getByte(i);
+ }
+ return res;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] res = new short[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getShort(i);
+ }
+ return res;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] res = new int[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getInt(i);
+ }
+ return res;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] res = new long[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getLong(i);
+ }
+ return res;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] res = new float[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getFloat(i);
+ }
+ return res;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] res = new double[numElements];
+ for (int i = 0; i < numElements; i++) {
+ res[i] = getDouble(i);
+ }
+ return res;
+ }
+
+ private BytesColumnVector.Bytes getByteArray(int pos) {
+ return ((BytesColumnVector) data).getBytes(offset + pos);
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
new file mode 100644
index 000000000..9792e87ec
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarMapData.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.ColumnarArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Columnar map to support access to vector column data.
+ *
+ *
Referenced from flink 1.14.0 {@code org.apache.flink.table.data.ColumnarMapData}.
+ */
+public final class ColumnarMapData implements MapData {
+
+ private final ColumnVector keyColumnVector;
+ private final ColumnVector valueColumnVector;
+ private final int offset;
+ private final int size;
+
+ public ColumnarMapData(
+ ColumnVector keyColumnVector,
+ ColumnVector valueColumnVector,
+ int offset,
+ int size) {
+ this.keyColumnVector = keyColumnVector;
+ this.valueColumnVector = valueColumnVector;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public ArrayData keyArray() {
+ return new ColumnarArrayData(keyColumnVector, offset, size);
+ }
+
+ @Override
+ public ArrayData valueArray() {
+ return new ColumnarArrayData(valueColumnVector, offset, size);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ "ColumnarMapData do not support equals, please compare fields one by one!");
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException(
+ "ColumnarMapData do not support hashCode, please hash fields one by one!");
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
new file mode 100644
index 000000000..ebb4ca26f
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/data/ColumnarRowData.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.data;
+
+import org.apache.hudi.table.format.cow.vector.VectorizedColumnBatch;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.TypedSetters;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Columnar row to support access to vector column data.
+ * It is a row view in {@link VectorizedColumnBatch}.
+ *
+ *
References {@code org.apache.flink.table.data.ColumnarRowData} to include FLINK-15390.
+ */
+public final class ColumnarRowData implements RowData, TypedSetters {
+
+ private RowKind rowKind = RowKind.INSERT;
+ private VectorizedColumnBatch vectorizedColumnBatch;
+ private int rowId;
+
+ public ColumnarRowData() {
+ }
+
+ public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch) {
+ this(vectorizedColumnBatch, 0);
+ }
+
+ public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch, int rowId) {
+ this.vectorizedColumnBatch = vectorizedColumnBatch;
+ this.rowId = rowId;
+ }
+
+ public void setVectorizedColumnBatch(VectorizedColumnBatch vectorizedColumnBatch) {
+ this.vectorizedColumnBatch = vectorizedColumnBatch;
+ this.rowId = 0;
+ }
+
+ public void setRowId(int rowId) {
+ this.rowId = rowId;
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return rowKind;
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ this.rowKind = kind;
+ }
+
+ @Override
+ public int getArity() {
+ return vectorizedColumnBatch.getArity();
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return vectorizedColumnBatch.isNullAt(rowId, pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return vectorizedColumnBatch.getBoolean(rowId, pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return vectorizedColumnBatch.getByte(rowId, pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return vectorizedColumnBatch.getShort(rowId, pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return vectorizedColumnBatch.getInt(rowId, pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return vectorizedColumnBatch.getLong(rowId, pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return vectorizedColumnBatch.getFloat(rowId, pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return vectorizedColumnBatch.getDouble(rowId, pos);
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ BytesColumnVector.Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
+ return StringData.fromBytes(byteArray.data, byteArray.offset, byteArray.len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ return vectorizedColumnBatch.getDecimal(rowId, pos, precision, scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ return vectorizedColumnBatch.getTimestamp(rowId, pos, precision);
+ }
+
+ @Override
+ public RawValueData getRawValue(int pos) {
+ throw new UnsupportedOperationException("RawValueData is not supported.");
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ BytesColumnVector.Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
+ if (byteArray.len == byteArray.data.length) {
+ return byteArray.data;
+ } else {
+ byte[] ret = new byte[byteArray.len];
+ System.arraycopy(byteArray.data, byteArray.offset, ret, 0, byteArray.len);
+ return ret;
+ }
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ return vectorizedColumnBatch.getRow(rowId, pos);
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ return vectorizedColumnBatch.getArray(rowId, pos);
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ return vectorizedColumnBatch.getMap(rowId, pos);
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setDecimal(int pos, DecimalData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public void setTimestamp(int pos, TimestampData value, int precision) {
+ throw new UnsupportedOperationException("Not support the operation!");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ "ColumnarRowData do not support equals, please compare fields one by one!");
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException(
+ "ColumnarRowData do not support hashCode, please hash fields one by one!");
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
new file mode 100644
index 000000000..f4c15b6a9
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarArrayData;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap array column vector.
+ */
+public class HeapArrayVector extends AbstractHeapVector
+ implements WritableColumnVector, ArrayColumnVector {
+
+ public long[] offsets;
+ public long[] lengths;
+ public ColumnVector child;
+ private int size;
+
+ public HeapArrayVector(int len) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ }
+
+ public HeapArrayVector(int len, ColumnVector vector) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ this.child = vector;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public int getLen() {
+ return this.isNull.length;
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarArrayData(child, (int) offset, (int) length);
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
new file mode 100644
index 000000000..f05a2e734
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarMapData;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap map column vector.
+ */
+public class HeapMapColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, MapColumnVector {
+
+ private long[] offsets;
+ private long[] lengths;
+ private int size;
+ private ColumnVector keys;
+ private ColumnVector values;
+
+ public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
+ super(len);
+ size = 0;
+ offsets = new long[len];
+ lengths = new long[len];
+ this.keys = keys;
+ this.values = values;
+ }
+
+ public void setOffsets(long[] offsets) {
+ this.offsets = offsets;
+ }
+
+ public void setLengths(long[] lengths) {
+ this.lengths = lengths;
+ }
+
+ public void setKeys(ColumnVector keys) {
+ this.keys = keys;
+ }
+
+ public void setValues(ColumnVector values) {
+ this.values = values;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarMapData(keys, values, (int) offset, (int) length);
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
new file mode 100644
index 000000000..ad05a612c
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap row column vector.
+ */
+public class HeapRowColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, RowColumnVector {
+
+ public WritableColumnVector[] vectors;
+
+ public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
+ super(len);
+ this.vectors = vectors;
+ }
+
+ @Override
+ public ColumnarRowData getRow(int i) {
+ ColumnarRowData columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(vectors));
+ columnarRowData.setRowId(i);
+ return columnarRowData;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
new file mode 100644
index 000000000..38424dad7
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/MapColumnVector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Map column vector.
+ */
+public interface MapColumnVector extends ColumnVector {
+ MapData getMap(int i);
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
new file mode 100644
index 000000000..293af7b9c
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/RowColumnVector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.hudi.table.format.cow.data.ColumnarRowData;
+
+import org.apache.flink.table.data.vector.ColumnVector;
+
+/**
+ * Row column vector.
+ */
+public interface RowColumnVector extends ColumnVector {
+ ColumnarRowData getRow(int i);
+}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
new file mode 100644
index 000000000..9eee55d1e
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/VectorizedColumnBatch.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.BooleanColumnVector;
+import org.apache.flink.table.data.vector.ByteColumnVector;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.DoubleColumnVector;
+import org.apache.flink.table.data.vector.FloatColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
+import org.apache.flink.table.data.vector.ShortColumnVector;
+import org.apache.flink.table.data.vector.TimestampColumnVector;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the unit
+ * of query execution, organized to minimize the cost per row.
+ *
+ *
{@code VectorizedColumnBatch}s are influenced by Apache Hive VectorizedRowBatch.
+ *
+ *
References {@code org.apache.flink.table.data.vector.VectorizedColumnBatch} to include FLINK-15390.
+ */
+public class VectorizedColumnBatch implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * This number is carefully chosen to minimize overhead and typically allows one
+ * VectorizedColumnBatch to fit in cache.
+ */
+ public static final int DEFAULT_SIZE = 2048;
+
+ private int numRows;
+ public final ColumnVector[] columns;
+
+ public VectorizedColumnBatch(ColumnVector[] vectors) {
+ this.columns = vectors;
+ }
+
+ public void setNumRows(int numRows) {
+ this.numRows = numRows;
+ }
+
+ public int getNumRows() {
+ return numRows;
+ }
+
+ public int getArity() {
+ return columns.length;
+ }
+
+ public boolean isNullAt(int rowId, int colId) {
+ return columns[colId].isNullAt(rowId);
+ }
+
+ public boolean getBoolean(int rowId, int colId) {
+ return ((BooleanColumnVector) columns[colId]).getBoolean(rowId);
+ }
+
+ public byte getByte(int rowId, int colId) {
+ return ((ByteColumnVector) columns[colId]).getByte(rowId);
+ }
+
+ public short getShort(int rowId, int colId) {
+ return ((ShortColumnVector) columns[colId]).getShort(rowId);
+ }
+
+ public int getInt(int rowId, int colId) {
+ return ((IntColumnVector) columns[colId]).getInt(rowId);
+ }
+
+ public long getLong(int rowId, int colId) {
+ return ((LongColumnVector) columns[colId]).getLong(rowId);
+ }
+
+ public float getFloat(int rowId, int colId) {
+ return ((FloatColumnVector) columns[colId]).getFloat(rowId);
+ }
+
+ public double getDouble(int rowId, int colId) {
+ return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
+ }
+
+ public BytesColumnVector.Bytes getByteArray(int rowId, int colId) {
+ return ((BytesColumnVector) columns[colId]).getBytes(rowId);
+ }
+
+ private byte[] getBytes(int rowId, int colId) {
+ BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
+ if (byteArray.len == byteArray.data.length) {
+ return byteArray.data;
+ } else {
+ return byteArray.getBytes();
+ }
+ }
+
+ public String getString(int rowId, int colId) {
+ BytesColumnVector.Bytes byteArray = getByteArray(rowId, colId);
+ return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
+ }
+
+ public DecimalData getDecimal(int rowId, int colId, int precision, int scale) {
+ return ((DecimalColumnVector) (columns[colId])).getDecimal(rowId, precision, scale);
+ }
+
+ public TimestampData getTimestamp(int rowId, int colId, int precision) {
+ return ((TimestampColumnVector) (columns[colId])).getTimestamp(rowId, precision);
+ }
+
+ public ArrayData getArray(int rowId, int colId) {
+ return ((ArrayColumnVector) columns[colId]).getArray(rowId);
+ }
+
+ public RowData getRow(int rowId, int colId) {
+ return ((RowColumnVector) columns[colId]).getRow(rowId);
+ }
+
+ public MapData getMap(int rowId, int colId) {
+ return ((MapColumnVector) columns[colId]).getMap(rowId);
+ }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
new file mode 100644
index 000000000..256d4c1bb
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Array {@link ColumnReader}.
+ */
+public class ArrayColumnReader extends BaseVectorizedColumnReader {
+
+ // The value read in last time
+ private Object lastValue;
+
+ // flag to indicate if there is no data in parquet data page
+ private boolean eof = false;
+
+ // flag to indicate if it's the first time to read parquet data page with this instance
+ boolean isFirstRow = true;
+
+ public ArrayColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean isUtcTimestamp,
+ Type type,
+ LogicalType logicalType)
+ throws IOException {
+ super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
+ HeapArrayVector lcv = (HeapArrayVector) vector;
+ // before readBatch, initial the size of offsets & lengths as the default value,
+ // the actual size will be assigned in setChildrenInfo() after reading complete.
+ lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ // Because the length of ListColumnVector.child can't be known now,
+ // the valueList will save all data for ListColumnVector temporary.
+ List