Use {@link org.apache.flink.formats.parquet.utils.ParquetRecordReader} - * to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + *
Use {@code ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream},
* overrides {@link #createInputSplits(int)} and {@link #close()} to change the behaviors.
*/
public class MergeOnReadInputFormat
@@ -299,7 +297,7 @@ public class MergeOnReadInputFormat
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap NOTE: reference from Flink release 1.11.2 {@code ParquetSplitReaderUtil}, modify to support INT64
+ * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports that.
+ */
+public class ParquetSplitReaderUtil {
+
+ /**
+ * Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
+ */
+ public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
+ boolean utcTimestamp,
+ boolean caseSensitive,
+ Configuration conf,
+ String[] fullFieldNames,
+ DataType[] fullFieldTypes,
+ Map Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}
+ * because it is not public.
+ */
+public class ParquetDecimalVector implements DecimalColumnVector {
+
+ public final ColumnVector vector;
+
+ public ParquetDecimalVector(ColumnVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ ((BytesColumnVector) vector).getBytes(i).getBytes(),
+ precision,
+ scale);
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNullAt(i);
+ }
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
new file mode 100644
index 000000000..07416a371
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.ParquetDictionary;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+
+/**
+ * Abstract {@link ColumnReader}.
+ * See {@link org.apache.parquet.column.impl.ColumnReaderImpl},
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ *
+ * Note: Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader}
+ * because some of the package scope methods.
+ */
+public abstract class AbstractColumnReader