1
0

[HUDI-4357] Support flink 1.15.x (#6050)

This commit is contained in:
Danny Chan
2022-07-06 13:42:58 +08:00
committed by GitHub
parent b18c32379f
commit 7eeaff9ee0
91 changed files with 7799 additions and 82 deletions

View File

@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.adapter;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
/**
* Adapter clazz for {@code DataStreamScanProvider}.
*/
public interface DataStreamScanProviderAdapter extends DataStreamScanProvider {
}

View File

@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.adapter;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
/**
* Adapter clazz for {@code DataStreamSinkProvider}.
*/
public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.adapter;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
@@ -25,6 +26,9 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.factories.FactoryUtil;
/**
* Adapter utils.
@@ -45,4 +49,12 @@ public class Utils {
watermarkInterval,
-1);
}
public static FactoryUtil.DefaultDynamicTableContext getTableContext(
ObjectIdentifier tablePath,
ResolvedCatalogTable catalogTable,
ReadableConfig conf) {
return new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
conf, Thread.currentThread().getContextClassLoader(), false);
}
}

View File

@@ -0,0 +1,523 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.formats.parquet.vector.reader.DoubleColumnReader;
import org.apache.flink.formats.parquet.vector.reader.FloatColumnReader;
import org.apache.flink.formats.parquet.vector.reader.IntColumnReader;
import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
import org.apache.flink.table.data.vector.heap.HeapByteVector;
import org.apache.flink.table.data.vector.heap.HeapBytesVector;
import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
import org.apache.flink.table.data.vector.heap.HeapFloatVector;
import org.apache.flink.table.data.vector.heap.HeapIntVector;
import org.apache.flink.table.data.vector.heap.HeapLongVector;
import org.apache.flink.table.data.vector.heap.HeapShortVector;
import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
import static org.apache.parquet.Preconditions.checkArgument;
/**
* Util for generating {@link ParquetColumnarRowSplitReader}.
*
* <p>NOTE: reference from Flink release 1.11.2 {@code ParquetSplitReaderUtil}, modify to support INT64
* based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports that.
*/
public class ParquetSplitReaderUtil {
/**
* Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
*/
public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
boolean utcTimestamp,
boolean caseSensitive,
Configuration conf,
String[] fullFieldNames,
DataType[] fullFieldTypes,
Map<String, Object> partitionSpec,
int[] selectedFields,
int batchSize,
Path path,
long splitStart,
long splitLength) throws IOException {
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(n -> !partitionSpec.containsKey(n))
.collect(Collectors.toList());
int[] selParquetFields = Arrays.stream(selectedFields)
.filter(i -> !partitionSpec.containsKey(fullFieldNames[i]))
.toArray();
ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {
// create and initialize the row batch
ColumnVector[] vectors = new ColumnVector[selectedFields.length];
for (int i = 0; i < vectors.length; i++) {
String name = fullFieldNames[selectedFields[i]];
LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType();
vectors[i] = createVector(readVectors, selNonPartNames, name, type, partitionSpec, batchSize);
}
return new VectorizedColumnBatch(vectors);
};
return new ParquetColumnarRowSplitReader(
utcTimestamp,
caseSensitive,
conf,
Arrays.stream(selParquetFields)
.mapToObj(i -> fullFieldTypes[i].getLogicalType())
.toArray(LogicalType[]::new),
selNonPartNames.toArray(new String[0]),
gen,
batchSize,
new org.apache.hadoop.fs.Path(path.toUri()),
splitStart,
splitLength);
}
private static ColumnVector createVector(
ColumnVector[] readVectors,
List<String> selNonPartNames,
String name,
LogicalType type,
Map<String, Object> partitionSpec,
int batchSize) {
if (partitionSpec.containsKey(name)) {
return createVectorFromConstant(type, partitionSpec.get(name), batchSize);
}
ColumnVector readVector = readVectors[selNonPartNames.indexOf(name)];
if (readVector == null) {
// when the read vector is null, use a constant null vector instead
readVector = createVectorFromConstant(type, null, batchSize);
}
return readVector;
}
private static ColumnVector createVectorFromConstant(
LogicalType type,
Object value,
int batchSize) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
HeapBytesVector bsv = new HeapBytesVector(batchSize);
if (value == null) {
bsv.fillWithNulls();
} else {
bsv.fill(value instanceof byte[]
? (byte[]) value
: value.toString().getBytes(StandardCharsets.UTF_8));
}
return bsv;
case BOOLEAN:
HeapBooleanVector bv = new HeapBooleanVector(batchSize);
if (value == null) {
bv.fillWithNulls();
} else {
bv.fill((boolean) value);
}
return bv;
case TINYINT:
HeapByteVector byteVector = new HeapByteVector(batchSize);
if (value == null) {
byteVector.fillWithNulls();
} else {
byteVector.fill(((Number) value).byteValue());
}
return byteVector;
case SMALLINT:
HeapShortVector sv = new HeapShortVector(batchSize);
if (value == null) {
sv.fillWithNulls();
} else {
sv.fill(((Number) value).shortValue());
}
return sv;
case INTEGER:
HeapIntVector iv = new HeapIntVector(batchSize);
if (value == null) {
iv.fillWithNulls();
} else {
iv.fill(((Number) value).intValue());
}
return iv;
case BIGINT:
HeapLongVector lv = new HeapLongVector(batchSize);
if (value == null) {
lv.fillWithNulls();
} else {
lv.fill(((Number) value).longValue());
}
return lv;
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
DecimalData decimal = value == null
? null
: Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
ColumnVector internalVector = createVectorFromConstant(
new VarBinaryType(),
decimal == null ? null : decimal.toUnscaledBytes(),
batchSize);
return new ParquetDecimalVector(internalVector);
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
if (value == null) {
fv.fillWithNulls();
} else {
fv.fill(((Number) value).floatValue());
}
return fv;
case DOUBLE:
HeapDoubleVector dv = new HeapDoubleVector(batchSize);
if (value == null) {
dv.fillWithNulls();
} else {
dv.fill(((Number) value).doubleValue());
}
return dv;
case DATE:
if (value instanceof LocalDate) {
value = Date.valueOf((LocalDate) value);
}
return createVectorFromConstant(
new IntType(),
value == null ? null : dateToInternal((Date) value),
batchSize);
case TIMESTAMP_WITHOUT_TIME_ZONE:
HeapTimestampVector tv = new HeapTimestampVector(batchSize);
if (value == null) {
tv.fillWithNulls();
} else {
tv.fill(TimestampData.fromLocalDateTime((LocalDateTime) value));
}
return tv;
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
private static List<ColumnDescriptor> filterDescriptors(int depth, Type type, List<ColumnDescriptor> columns) throws ParquetRuntimeException {
List<ColumnDescriptor> filtered = new ArrayList<>();
for (ColumnDescriptor descriptor : columns) {
if (depth >= descriptor.getPath().length) {
throw new InvalidSchemaException("Expect depth " + depth + " for schema: " + descriptor);
}
if (type.getName().equals(descriptor.getPath()[depth])) {
filtered.add(descriptor);
}
}
ValidationUtils.checkState(filtered.size() > 0, "Corrupted Parquet schema");
return filtered;
}
public static ColumnReader createColumnReader(
boolean utcTimestamp,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> descriptors,
PageReadStore pages) throws IOException {
return createColumnReader(utcTimestamp, fieldType, physicalType, descriptors,
pages, 0);
}
private static ColumnReader createColumnReader(
boolean utcTimestamp,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> columns,
PageReadStore pages,
int depth) throws IOException {
List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
ColumnDescriptor descriptor = descriptors.get(0);
PageReader pageReader = pages.getPageReader(descriptor);
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
return new BooleanColumnReader(descriptor, pageReader);
case TINYINT:
return new ByteColumnReader(descriptor, pageReader);
case DOUBLE:
return new DoubleColumnReader(descriptor, pageReader);
case FLOAT:
return new FloatColumnReader(descriptor, pageReader);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return new IntColumnReader(descriptor, pageReader);
case BIGINT:
return new LongColumnReader(descriptor, pageReader);
case SMALLINT:
return new ShortColumnReader(descriptor, pageReader);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
return new BytesColumnReader(descriptor, pageReader);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
throw new AssertionError();
}
case DECIMAL:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return new IntColumnReader(descriptor, pageReader);
case INT64:
return new LongColumnReader(descriptor, pageReader);
case BINARY:
return new BytesColumnReader(descriptor, pageReader);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenBytesColumnReader(
descriptor, pageReader, ((DecimalType) fieldType).getPrecision());
default:
throw new AssertionError();
}
case ARRAY:
return new ArrayColumnReader(
descriptor,
pageReader,
utcTimestamp,
descriptor.getPrimitiveType(),
fieldType);
case MAP:
MapType mapType = (MapType) fieldType;
ArrayColumnReader keyReader =
new ArrayColumnReader(
descriptor,
pageReader,
utcTimestamp,
descriptor.getPrimitiveType(),
new ArrayType(mapType.getKeyType()));
ArrayColumnReader valueReader =
new ArrayColumnReader(
descriptors.get(1),
pages.getPageReader(descriptors.get(1)),
utcTimestamp,
descriptors.get(1).getPrimitiveType(),
new ArrayType(mapType.getValueType()));
return new MapColumnReader(keyReader, valueReader, fieldType);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
List<ColumnReader> fieldReaders = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
fieldReaders.add(
createColumnReader(
utcTimestamp,
rowType.getTypeAt(i),
groupType.getType(i),
descriptors,
pages,
depth + 1));
}
return new RowColumnReader(fieldReaders);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
}
public static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> descriptors) {
return createWritableColumnVector(batchSize, fieldType, physicalType, descriptors, 0);
}
private static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> columns,
int depth) {
List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
"Unexpected type: %s", typeName);
return new HeapBooleanVector(batchSize);
case TINYINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32,
"Unexpected type: %s", typeName);
return new HeapByteVector(batchSize);
case DOUBLE:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
"Unexpected type: %s", typeName);
return new HeapDoubleVector(batchSize);
case FLOAT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
"Unexpected type: %s", typeName);
return new HeapFloatVector(batchSize);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32,
"Unexpected type: %s", typeName);
return new HeapIntVector(batchSize);
case BIGINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT64,
"Unexpected type: %s", typeName);
return new HeapLongVector(batchSize);
case SMALLINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32,
"Unexpected type: %s", typeName);
return new HeapShortVector(batchSize);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.BINARY,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
"TIME_MICROS original type is not ");
return new HeapTimestampVector(batchSize);
case DECIMAL:
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
batchSize,
createWritableColumnVector(
batchSize,
arrayType.getElementType(),
physicalType,
descriptors,
depth));
case MAP:
MapType mapType = (MapType) fieldType;
GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
// the map column has three level paths.
return new HeapMapColumnVector(
batchSize,
createWritableColumnVector(
batchSize,
mapType.getKeyType(),
repeatedType.getType(0),
descriptors,
depth + 2),
createWritableColumnVector(
batchSize,
mapType.getValueType(),
repeatedType.getType(1),
descriptors,
depth + 2));
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
WritableColumnVector[] columnVectors =
new WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
columnVectors[i] =
createWritableColumnVector(
batchSize,
rowType.getTypeAt(i),
groupType.getType(i),
descriptors,
depth + 1);
}
return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
}
}

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.ColumnarArrayData;
import org.apache.flink.table.data.vector.ArrayColumnVector;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
/**
* This class represents a nullable heap array column vector.
*/
public class HeapArrayVector extends AbstractHeapVector
implements WritableColumnVector, ArrayColumnVector {
public long[] offsets;
public long[] lengths;
public ColumnVector child;
private int size;
public HeapArrayVector(int len) {
super(len);
offsets = new long[len];
lengths = new long[len];
}
public HeapArrayVector(int len, ColumnVector vector) {
super(len);
offsets = new long[len];
lengths = new long[len];
this.child = vector;
}
public int getSize() {
return size;
}
public void setSize(int size) {
this.size = size;
}
public int getLen() {
return this.isNull.length;
}
@Override
public ArrayData getArray(int i) {
long offset = offsets[i];
long length = lengths[i];
return new ColumnarArrayData(child, (int) offset, (int) length);
}
}

View File

@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector;
import org.apache.flink.table.data.ColumnarMapData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.MapColumnVector;
import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
/**
* This class represents a nullable heap map column vector.
*/
public class HeapMapColumnVector extends AbstractHeapVector
implements WritableColumnVector, MapColumnVector {
private long[] offsets;
private long[] lengths;
private int size;
private ColumnVector keys;
private ColumnVector values;
public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
super(len);
size = 0;
offsets = new long[len];
lengths = new long[len];
this.keys = keys;
this.values = values;
}
public void setOffsets(long[] offsets) {
this.offsets = offsets;
}
public void setLengths(long[] lengths) {
this.lengths = lengths;
}
public void setKeys(ColumnVector keys) {
this.keys = keys;
}
public void setValues(ColumnVector values) {
this.values = values;
}
public int getSize() {
return size;
}
public void setSize(int size) {
this.size = size;
}
@Override
public MapData getMap(int i) {
long offset = offsets[i];
long length = lengths[i];
return new ColumnarMapData(keys, values, (int) offset, (int) length);
}
}

View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector;
import org.apache.flink.table.data.ColumnarRowData;
import org.apache.flink.table.data.vector.RowColumnVector;
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
/**
* This class represents a nullable heap row column vector.
*/
public class HeapRowColumnVector extends AbstractHeapVector
implements WritableColumnVector, RowColumnVector {
public WritableColumnVector[] vectors;
public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
super(len);
this.vectors = vectors;
}
@Override
public ColumnarRowData getRow(int i) {
ColumnarRowData columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(vectors));
columnarRowData.setRowId(i);
return columnarRowData;
}
}

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.vector.BytesColumnVector;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.DecimalColumnVector;
/**
* Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to
* provide {@link DecimalColumnVector} interface.
*
* <p>Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}
* because it is not public.
*/
public class ParquetDecimalVector implements DecimalColumnVector {
public final ColumnVector vector;
public ParquetDecimalVector(ColumnVector vector) {
this.vector = vector;
}
@Override
public DecimalData getDecimal(int i, int precision, int scale) {
return DecimalData.fromUnscaledBytes(
((BytesColumnVector) vector).getBytes(i).getBytes(),
precision,
scale);
}
@Override
public boolean isNullAt(int i) {
return vector.isNullAt(i);
}
}

View File

@@ -0,0 +1,325 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.formats.parquet.vector.ParquetDictionary;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.data.vector.writable.WritableIntVector;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.PrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
/**
* Abstract {@link ColumnReader}.
* See {@link org.apache.parquet.column.impl.ColumnReaderImpl},
* part of the code is referred from Apache Spark and Apache Parquet.
*
* <p>Note: Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader}
* because some of the package scope methods.
*/
public abstract class AbstractColumnReader<V extends WritableColumnVector>
implements ColumnReader<V> {
private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.class);
private final PageReader pageReader;
/**
* The dictionary, if this column has dictionary encoding.
*/
protected final Dictionary dictionary;
/**
* Maximum definition level for this column.
*/
protected final int maxDefLevel;
protected final ColumnDescriptor descriptor;
/**
* Total number of values read.
*/
private long valuesRead;
/**
* value that indicates the end of the current page. That is, if valuesRead ==
* endOfPageValueCount, we are at the end of the page.
*/
private long endOfPageValueCount;
/**
* If true, the current page is dictionary encoded.
*/
private boolean isCurrentPageDictionaryEncoded;
/**
* Total values in the current page.
*/
private int pageValueCount;
/*
* Input streams:
* 1.Run length encoder to encode every data, so we have run length stream to get
* run length information.
* 2.Data maybe is real data, maybe is dictionary ids which need be decode to real
* data from Dictionary.
*
* Run length stream ------> Data stream
* |
* ------> Dictionary ids stream
*/
/**
* Run length decoder for data and dictionary.
*/
protected RunLengthDecoder runLenDecoder;
/**
* Data input stream.
*/
ByteBufferInputStream dataInputStream;
/**
* Dictionary decoder to wrap dictionary ids input stream.
*/
private RunLengthDecoder dictionaryIdsDecoder;
public AbstractColumnReader(
ColumnDescriptor descriptor,
PageReader pageReader) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
this.isCurrentPageDictionaryEncoded = true;
} catch (IOException e) {
throw new IOException("could not decode the dictionary for " + descriptor, e);
}
} else {
this.dictionary = null;
this.isCurrentPageDictionaryEncoded = false;
}
/*
* Total number of values in this column (in this row group).
*/
long totalValueCount = pageReader.getTotalValueCount();
if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0");
}
}
protected void checkTypeName(PrimitiveType.PrimitiveTypeName expectedName) {
PrimitiveType.PrimitiveTypeName actualName = descriptor.getPrimitiveType().getPrimitiveTypeName();
Preconditions.checkArgument(
actualName == expectedName,
"Expected type name: %s, actual type name: %s",
expectedName,
actualName);
}
/**
* Reads `total` values from this columnReader into column.
*/
@Override
public final void readToVector(int readNumber, V vector) throws IOException {
int rowId = 0;
WritableIntVector dictionaryIds = null;
if (dictionary != null) {
dictionaryIds = vector.reserveDictionaryIds(readNumber);
}
while (readNumber > 0) {
// Compute the number of values we want to read in this page.
int leftInPage = (int) (endOfPageValueCount - valuesRead);
if (leftInPage == 0) {
DataPage page = pageReader.readPage();
if (page instanceof DataPageV1) {
readPageV1((DataPageV1) page);
} else if (page instanceof DataPageV2) {
readPageV2((DataPageV2) page);
} else {
throw new RuntimeException("Unsupported page type: " + page.getClass());
}
leftInPage = (int) (endOfPageValueCount - valuesRead);
}
int num = Math.min(readNumber, leftInPage);
if (isCurrentPageDictionaryEncoded) {
// Read and decode dictionary ids.
runLenDecoder.readDictionaryIds(
num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
vector.setDictionary(new ParquetDictionary(dictionary));
} else {
readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
}
} else {
if (vector.hasDictionary() && rowId != 0) {
// This batch already has dictionary encoded values but this new page is not. The batch
// does not support a mix of dictionary and not so we will decode the dictionary.
readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
}
vector.setDictionary(null);
readBatch(rowId, num, vector);
}
valuesRead += num;
rowId += num;
readNumber -= num;
}
}
private void readPageV1(DataPageV1 page) throws IOException {
this.pageValueCount = page.getValueCount();
ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
// Initialize the decoders.
if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
}
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
this.runLenDecoder = new RunLengthDecoder(bitWidth);
try {
BytesInput bytes = page.getBytes();
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
this.runLenDecoder.initFromStream(pageValueCount, in);
prepareNewPage(page.getValueEncoding(), in);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}
private void readPageV2(DataPageV2 page) throws IOException {
this.pageValueCount = page.getValueCount();
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
// do not read the length from the stream. v2 pages handle dividing the page bytes.
this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
this.runLenDecoder.initFromStream(
this.pageValueCount, page.getDefinitionLevels().toInputStream());
try {
prepareNewPage(page.getDataEncoding(), page.getData().toInputStream());
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}
private void prepareNewPage(
Encoding dataEncoding,
ByteBufferInputStream in) throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
if (dictionary == null) {
throw new IOException("Could not read page in col "
+ descriptor
+ " as the dictionary was missing for encoding "
+ dataEncoding);
}
@SuppressWarnings("deprecation")
Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataInputStream = null;
this.dictionaryIdsDecoder = new RunLengthDecoder();
try {
this.dictionaryIdsDecoder.initFromStream(pageValueCount, in);
} catch (IOException e) {
throw new IOException("could not read dictionary in col " + descriptor, e);
}
this.isCurrentPageDictionaryEncoded = true;
} else {
if (dataEncoding != Encoding.PLAIN) {
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dictionaryIdsDecoder = null;
LOG.debug("init from page at offset {} for length {}", in.position(), in.available());
this.dataInputStream = in.remainingStream();
this.isCurrentPageDictionaryEncoded = false;
}
afterReadPage();
}
final ByteBuffer readDataBuffer(int length) {
try {
return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
}
}
/**
* After read a page, we may need some initialization.
*/
protected void afterReadPage() {
}
/**
* Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.
* If return false, we will decode all the data first.
*/
protected boolean supportLazyDecode() {
return true;
}
/**
* Read batch from {@link #runLenDecoder} and {@link #dataInputStream}.
*/
protected abstract void readBatch(int rowId, int num, V column);
/**
* Decode dictionary ids to data.
* From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}.
*/
protected abstract void readBatchFromDictionaryIds(
int rowId,
int num,
V column,
WritableIntVector dictionaryIds);
}

View File

@@ -0,0 +1,473 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
import org.apache.flink.table.data.vector.heap.HeapByteVector;
import org.apache.flink.table.data.vector.heap.HeapBytesVector;
import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
import org.apache.flink.table.data.vector.heap.HeapFloatVector;
import org.apache.flink.table.data.vector.heap.HeapIntVector;
import org.apache.flink.table.data.vector.heap.HeapLongVector;
import org.apache.flink.table.data.vector.heap.HeapShortVector;
import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Array {@link ColumnReader}.
*/
public class ArrayColumnReader extends BaseVectorizedColumnReader {
// The value read in last time
private Object lastValue;
// flag to indicate if there is no data in parquet data page
private boolean eof = false;
// flag to indicate if it's the first time to read parquet data page with this instance
boolean isFirstRow = true;
public ArrayColumnReader(
ColumnDescriptor descriptor,
PageReader pageReader,
boolean isUtcTimestamp,
Type type,
LogicalType logicalType)
throws IOException {
super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
}
@Override
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapArrayVector lcv = (HeapArrayVector) vector;
// before readBatch, initial the size of offsets & lengths as the default value,
// the actual size will be assigned in setChildrenInfo() after reading complete.
lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
// Because the length of ListColumnVector.child can't be known now,
// the valueList will save all data for ListColumnVector temporary.
List<Object> valueList = new ArrayList<>();
LogicalType category = ((ArrayType) logicalType).getElementType();
// read the first row in parquet data page, this will be only happened once for this
// instance
if (isFirstRow) {
if (!fetchNextValue(category)) {
return;
}
isFirstRow = false;
}
int index = collectDataFromParquetPage(readNumber, lcv, valueList, category);
// Convert valueList to array for the ListColumnVector.child
fillColumnVector(category, lcv, valueList, index);
}
/**
* Reads a single value from parquet page, puts it into lastValue. Returns a boolean indicating
* if there is more values to read (true).
*
* @param category
* @return boolean
* @throws IOException
*/
private boolean fetchNextValue(LogicalType category) throws IOException {
int left = readPageIfNeed();
if (left > 0) {
// get the values of repetition and definitionLevel
readRepetitionAndDefinitionLevels();
// read the data if it isn't null
if (definitionLevel == maxDefLevel) {
if (isCurrentPageDictionaryEncoded) {
lastValue = dataColumn.readValueDictionaryId();
} else {
lastValue = readPrimitiveTypedRow(category);
}
} else {
lastValue = null;
}
return true;
} else {
eof = true;
return false;
}
}
private int readPageIfNeed() throws IOException {
// Compute the number of values we want to read in this page.
int leftInPage = (int) (endOfPageValueCount - valuesRead);
if (leftInPage == 0) {
// no data left in current page, load data from new page
readPage();
leftInPage = (int) (endOfPageValueCount - valuesRead);
}
return leftInPage;
}
// Need to be in consistent with that VectorizedPrimitiveColumnReader#readBatchHelper
// TODO Reduce the duplicated code
private Object readPrimitiveTypedRow(LogicalType category) {
switch (category.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
return dataColumn.readString();
case BOOLEAN:
return dataColumn.readBoolean();
case TIME_WITHOUT_TIME_ZONE:
case DATE:
case INTEGER:
return dataColumn.readInteger();
case TINYINT:
return dataColumn.readTinyInt();
case SMALLINT:
return dataColumn.readSmallInt();
case BIGINT:
return dataColumn.readLong();
case FLOAT:
return dataColumn.readFloat();
case DOUBLE:
return dataColumn.readDouble();
case DECIMAL:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return dataColumn.readInteger();
case INT64:
return dataColumn.readLong();
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
return dataColumn.readString();
default:
throw new AssertionError();
}
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return dataColumn.readTimestamp();
default:
throw new RuntimeException("Unsupported type in the list: " + type);
}
}
private Object dictionaryDecodeValue(LogicalType category, Integer dictionaryValue) {
if (dictionaryValue == null) {
return null;
}
switch (category.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
return dictionary.readString(dictionaryValue);
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case INTEGER:
return dictionary.readInteger(dictionaryValue);
case BOOLEAN:
return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
case DOUBLE:
return dictionary.readDouble(dictionaryValue);
case FLOAT:
return dictionary.readFloat(dictionaryValue);
case TINYINT:
return dictionary.readTinyInt(dictionaryValue);
case SMALLINT:
return dictionary.readSmallInt(dictionaryValue);
case BIGINT:
return dictionary.readLong(dictionaryValue);
case DECIMAL:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return dictionary.readInteger(dictionaryValue);
case INT64:
return dictionary.readLong(dictionaryValue);
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return dictionary.readString(dictionaryValue);
default:
throw new AssertionError();
}
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return dictionary.readTimestamp(dictionaryValue);
default:
throw new RuntimeException("Unsupported type in the list: " + type);
}
}
/**
* Collects data from a parquet page and returns the final row index where it stopped. The
* returned index can be equal to or less than total.
*
* @param total maximum number of rows to collect
* @param lcv column vector to do initial setup in data collection time
* @param valueList collection of values that will be fed into the vector later
* @param category
* @return int
* @throws IOException
*/
private int collectDataFromParquetPage(
int total, HeapArrayVector lcv, List<Object> valueList, LogicalType category)
throws IOException {
int index = 0;
/*
* Here is a nested loop for collecting all values from a parquet page.
* A column of array type can be considered as a list of lists, so the two loops are as below:
* 1. The outer loop iterates on rows (index is a row index, so points to a row in the batch), e.g.:
* [0, 2, 3] <- index: 0
* [NULL, 3, 4] <- index: 1
*
* 2. The inner loop iterates on values within a row (sets all data from parquet data page
* for an element in ListColumnVector), so fetchNextValue returns values one-by-one:
* 0, 2, 3, NULL, 3, 4
*
* As described below, the repetition level (repetitionLevel != 0)
* can be used to decide when we'll start to read values for the next list.
*/
while (!eof && index < total) {
// add element to ListColumnVector one by one
lcv.offsets[index] = valueList.size();
/*
* Let's collect all values for a single list.
* Repetition level = 0 means that a new list started there in the parquet page,
* in that case, let's exit from the loop, and start to collect value for a new list.
*/
do {
/*
* Definition level = 0 when a NULL value was returned instead of a list
* (this is not the same as a NULL value in of a list).
*/
if (definitionLevel == 0) {
lcv.setNullAt(index);
}
valueList.add(
isCurrentPageDictionaryEncoded
? dictionaryDecodeValue(category, (Integer) lastValue)
: lastValue);
} while (fetchNextValue(category) && (repetitionLevel != 0));
lcv.lengths[index] = valueList.size() - lcv.offsets[index];
index++;
}
return index;
}
/**
* The lengths & offsets will be initialized as default size (1024), it should be set to the
* actual size according to the element number.
*/
private void setChildrenInfo(HeapArrayVector lcv, int itemNum, int elementNum) {
lcv.setSize(itemNum);
long[] lcvLength = new long[elementNum];
long[] lcvOffset = new long[elementNum];
System.arraycopy(lcv.lengths, 0, lcvLength, 0, elementNum);
System.arraycopy(lcv.offsets, 0, lcvOffset, 0, elementNum);
lcv.lengths = lcvLength;
lcv.offsets = lcvOffset;
}
private void fillColumnVector(
LogicalType category, HeapArrayVector lcv, List valueList, int elementNum) {
int total = valueList.size();
setChildrenInfo(lcv, total, elementNum);
switch (category.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
lcv.child = new HeapBytesVector(total);
((HeapBytesVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
byte[] src = ((List<byte[]>) valueList).get(i);
if (src == null) {
((HeapBytesVector) lcv.child).setNullAt(i);
} else {
((HeapBytesVector) lcv.child).appendBytes(i, src, 0, src.length);
}
}
break;
case BOOLEAN:
lcv.child = new HeapBooleanVector(total);
((HeapBooleanVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapBooleanVector) lcv.child).setNullAt(i);
} else {
((HeapBooleanVector) lcv.child).vector[i] =
((List<Boolean>) valueList).get(i);
}
}
break;
case TINYINT:
lcv.child = new HeapByteVector(total);
((HeapByteVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapByteVector) lcv.child).setNullAt(i);
} else {
((HeapByteVector) lcv.child).vector[i] =
(byte) ((List<Integer>) valueList).get(i).intValue();
}
}
break;
case SMALLINT:
lcv.child = new HeapShortVector(total);
((HeapShortVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapShortVector) lcv.child).setNullAt(i);
} else {
((HeapShortVector) lcv.child).vector[i] =
(short) ((List<Integer>) valueList).get(i).intValue();
}
}
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
lcv.child = new HeapIntVector(total);
((HeapIntVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapIntVector) lcv.child).setNullAt(i);
} else {
((HeapIntVector) lcv.child).vector[i] = ((List<Integer>) valueList).get(i);
}
}
break;
case FLOAT:
lcv.child = new HeapFloatVector(total);
((HeapFloatVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapFloatVector) lcv.child).setNullAt(i);
} else {
((HeapFloatVector) lcv.child).vector[i] = ((List<Float>) valueList).get(i);
}
}
break;
case BIGINT:
lcv.child = new HeapLongVector(total);
((HeapLongVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapLongVector) lcv.child).setNullAt(i);
} else {
((HeapLongVector) lcv.child).vector[i] = ((List<Long>) valueList).get(i);
}
}
break;
case DOUBLE:
lcv.child = new HeapDoubleVector(total);
((HeapDoubleVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapDoubleVector) lcv.child).setNullAt(i);
} else {
((HeapDoubleVector) lcv.child).vector[i] =
((List<Double>) valueList).get(i);
}
}
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
lcv.child = new HeapTimestampVector(total);
((HeapTimestampVector) lcv.child).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapTimestampVector) lcv.child).setNullAt(i);
} else {
((HeapTimestampVector) lcv.child)
.setTimestamp(i, ((List<TimestampData>) valueList).get(i));
}
}
break;
case DECIMAL:
PrimitiveType.PrimitiveTypeName primitiveTypeName =
descriptor.getPrimitiveType().getPrimitiveTypeName();
switch (primitiveTypeName) {
case INT32:
lcv.child = new ParquetDecimalVector(new HeapIntVector(total));
((HeapIntVector) ((ParquetDecimalVector) lcv.child).vector).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapIntVector) ((ParquetDecimalVector) lcv.child).vector)
.setNullAt(i);
} else {
((HeapIntVector) ((ParquetDecimalVector) lcv.child).vector)
.vector[i] =
((List<Integer>) valueList).get(i);
}
}
break;
case INT64:
lcv.child = new ParquetDecimalVector(new HeapLongVector(total));
((HeapLongVector) ((ParquetDecimalVector) lcv.child).vector).reset();
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
((HeapLongVector) ((ParquetDecimalVector) lcv.child).vector)
.setNullAt(i);
} else {
((HeapLongVector) ((ParquetDecimalVector) lcv.child).vector)
.vector[i] =
((List<Long>) valueList).get(i);
}
}
break;
default:
lcv.child = new ParquetDecimalVector(new HeapBytesVector(total));
((HeapBytesVector) ((ParquetDecimalVector) lcv.child).vector).reset();
for (int i = 0; i < valueList.size(); i++) {
byte[] src = ((List<byte[]>) valueList).get(i);
if (valueList.get(i) == null) {
((HeapBytesVector) ((ParquetDecimalVector) lcv.child).vector)
.setNullAt(i);
} else {
((HeapBytesVector) ((ParquetDecimalVector) lcv.child).vector)
.appendBytes(i, src, 0, src.length);
}
}
break;
}
break;
default:
throw new RuntimeException("Unsupported type in the list: " + type);
}
}
}

View File

@@ -0,0 +1,313 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
import static org.apache.parquet.column.ValuesType.VALUES;
/**
* Abstract {@link ColumnReader}. part of the code is referred from Apache Hive and Apache Parquet.
*/
public abstract class BaseVectorizedColumnReader implements ColumnReader<WritableColumnVector> {
private static final Logger LOG = LoggerFactory.getLogger(BaseVectorizedColumnReader.class);
protected boolean isUtcTimestamp;
/**
* Total number of values read.
*/
protected long valuesRead;
/**
* value that indicates the end of the current page. That is, if valuesRead ==
* endOfPageValueCount, we are at the end of the page.
*/
protected long endOfPageValueCount;
/**
* The dictionary, if this column has dictionary encoding.
*/
protected final ParquetDataColumnReader dictionary;
/**
* If true, the current page is dictionary encoded.
*/
protected boolean isCurrentPageDictionaryEncoded;
/**
* Maximum definition level for this column.
*/
protected final int maxDefLevel;
protected int definitionLevel;
protected int repetitionLevel;
/**
* Repetition/Definition/Value readers.
*/
protected IntIterator repetitionLevelColumn;
protected IntIterator definitionLevelColumn;
protected ParquetDataColumnReader dataColumn;
/**
* Total values in the current page.
*/
protected int pageValueCount;
protected final PageReader pageReader;
protected final ColumnDescriptor descriptor;
protected final Type type;
protected final LogicalType logicalType;
public BaseVectorizedColumnReader(
ColumnDescriptor descriptor,
PageReader pageReader,
boolean isUtcTimestamp,
Type parquetType,
LogicalType logicalType)
throws IOException {
this.descriptor = descriptor;
this.type = parquetType;
this.pageReader = pageReader;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
this.isUtcTimestamp = isUtcTimestamp;
this.logicalType = logicalType;
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
this.dictionary =
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
parquetType.asPrimitiveType(),
dictionaryPage
.getEncoding()
.initDictionary(descriptor, dictionaryPage),
isUtcTimestamp);
this.isCurrentPageDictionaryEncoded = true;
} catch (IOException e) {
throw new IOException("could not decode the dictionary for " + descriptor, e);
}
} else {
this.dictionary = null;
this.isCurrentPageDictionaryEncoded = false;
}
}
protected void readRepetitionAndDefinitionLevels() {
repetitionLevel = repetitionLevelColumn.nextInt();
definitionLevel = definitionLevelColumn.nextInt();
valuesRead++;
}
protected void readPage() throws IOException {
DataPage page = pageReader.readPage();
if (page == null) {
return;
}
page.accept(
new DataPage.Visitor<Void>() {
@Override
public Void visit(DataPageV1 dataPageV1) {
readPageV1(dataPageV1);
return null;
}
@Override
public Void visit(DataPageV2 dataPageV2) {
readPageV2(dataPageV2);
return null;
}
});
}
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount)
throws IOException {
this.pageValueCount = valueCount;
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
if (dictionary == null) {
throw new IOException(
"could not read page in col "
+ descriptor
+ " as the dictionary was missing for encoding "
+ dataEncoding);
}
dataColumn =
ParquetDataColumnReaderFactory.getDataColumnReaderByType(
type.asPrimitiveType(),
dataEncoding.getDictionaryBasedValuesReader(
descriptor, VALUES, dictionary.getDictionary()),
isUtcTimestamp);
this.isCurrentPageDictionaryEncoded = true;
} else {
dataColumn =
ParquetDataColumnReaderFactory.getDataColumnReaderByType(
type.asPrimitiveType(),
dataEncoding.getValuesReader(descriptor, VALUES),
isUtcTimestamp);
this.isCurrentPageDictionaryEncoded = false;
}
try {
dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
}
private void readPageV1(DataPageV1 page) {
ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
BytesInput bytes = page.getBytes();
LOG.debug("page size " + bytes.size() + " bytes and " + pageValueCount + " records");
ByteBufferInputStream in = bytes.toInputStream();
LOG.debug("reading repetition levels at " + in.position());
rlReader.initFromPage(pageValueCount, in);
LOG.debug("reading definition levels at " + in.position());
dlReader.initFromPage(pageValueCount, in);
LOG.debug("reading data at " + in.position());
initDataReader(page.getValueEncoding(), in, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException(
"could not read page " + page + " in col " + descriptor, e);
}
}
private void readPageV2(DataPageV2 page) {
this.pageValueCount = page.getValueCount();
this.repetitionLevelColumn =
newRLEIterator(descriptor.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevelColumn =
newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
try {
LOG.debug(
"page data size "
+ page.getData().size()
+ " bytes and "
+ pageValueCount
+ " records");
initDataReader(
page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException(
"could not read page " + page + " in col " + descriptor, e);
}
}
private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
try {
if (maxLevel == 0) {
return new NullIntIterator();
}
return new RLEIntIterator(
new RunLengthBitPackingHybridDecoder(
BytesUtils.getWidthFromMaxInt(maxLevel),
new ByteArrayInputStream(bytes.toByteArray())));
} catch (IOException e) {
throw new ParquetDecodingException(
"could not read levels in page for col " + descriptor, e);
}
}
/**
* Utility classes to abstract over different way to read ints with different encodings.
*/
abstract static class IntIterator {
abstract int nextInt();
}
/**
* read ints from {@link ValuesReader}.
*/
protected static final class ValuesReaderIntIterator extends IntIterator {
ValuesReader delegate;
public ValuesReaderIntIterator(ValuesReader delegate) {
this.delegate = delegate;
}
@Override
int nextInt() {
return delegate.readInteger();
}
}
/**
* read ints from {@link RunLengthBitPackingHybridDecoder}.
*/
protected static final class RLEIntIterator extends IntIterator {
RunLengthBitPackingHybridDecoder delegate;
public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
this.delegate = delegate;
}
@Override
int nextInt() {
try {
return delegate.readInt();
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}
}
/**
* return zero.
*/
protected static final class NullIntIterator extends IntIterator {
@Override
int nextInt() {
return 0;
}
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.vector.writable.WritableBytesVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.data.vector.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Fixed length bytes {@code ColumnReader}, just for decimal.
*
* <p>Note: Reference Flink release 1.13.2
* {@code org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader}
* to always write as legacy decimal format.
*/
public class FixedLenBytesColumnReader<V extends WritableColumnVector>
extends AbstractColumnReader<V> {
public FixedLenBytesColumnReader(
ColumnDescriptor descriptor, PageReader pageReader, int precision) throws IOException {
super(descriptor, pageReader);
checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
}
@Override
protected void readBatch(int rowId, int num, V column) {
int bytesLen = descriptor.getPrimitiveType().getTypeLength();
WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
byte[] bytes = readDataBinary(bytesLen).getBytes();
bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length);
} else {
bytesVector.setNullAt(rowId + i);
}
}
}
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, V column, WritableIntVector dictionaryIds) {
WritableBytesVector bytesVector = (WritableBytesVector) column;
for (int i = rowId; i < rowId + num; ++i) {
if (!bytesVector.isNullAt(i)) {
byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytes();
bytesVector.appendBytes(i, v, 0, v.length);
}
}
}
private Binary readDataBinary(int len) {
ByteBuffer buffer = readDataBuffer(len);
if (buffer.hasArray()) {
return Binary.fromConstantByteArray(
buffer.array(), buffer.arrayOffset() + buffer.position(), len);
} else {
byte[] bytes = new byte[len];
buffer.get(bytes);
return Binary.fromConstantByteArray(bytes);
}
}
}

View File

@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.writable.WritableIntVector;
import org.apache.flink.table.data.vector.writable.WritableTimestampVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
/**
* Timestamp {@link org.apache.flink.formats.parquet.vector.reader.ColumnReader} that supports INT64 8 bytes,
* TIMESTAMP_MILLIS is the deprecated ConvertedType counterpart of a TIMESTAMP logical type
* that is UTC normalized and has MILLIS precision.
*
* <p>See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
* TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
*/
public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTimestampVector> {
private final boolean utcTimestamp;
private final ChronoUnit chronoUnit;
public Int64TimestampColumnReader(
boolean utcTimestamp,
ColumnDescriptor descriptor,
PageReader pageReader,
int precision) throws IOException {
super(descriptor, pageReader);
this.utcTimestamp = utcTimestamp;
if (precision <= 3) {
this.chronoUnit = ChronoUnit.MILLIS;
} else if (precision <= 6) {
this.chronoUnit = ChronoUnit.MICROS;
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
@Override
protected boolean supportLazyDecode() {
return false;
}
@Override
protected void readBatch(int rowId, int num, WritableTimestampVector column) {
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
ByteBuffer buffer = readDataBuffer(8);
column.setTimestamp(rowId + i, int64ToTimestamp(utcTimestamp, buffer.getLong(), chronoUnit));
} else {
column.setNullAt(rowId + i);
}
}
}
@Override
protected void readBatchFromDictionaryIds(
int rowId,
int num,
WritableTimestampVector column,
WritableIntVector dictionaryIds) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.setTimestamp(i, decodeInt64ToTimestamp(
utcTimestamp, dictionary, dictionaryIds.getInt(i), chronoUnit));
}
}
}
public static TimestampData decodeInt64ToTimestamp(
boolean utcTimestamp,
org.apache.parquet.column.Dictionary dictionary,
int id,
ChronoUnit unit) {
long value = dictionary.decodeToLong(id);
return int64ToTimestamp(utcTimestamp, value, unit);
}
private static TimestampData int64ToTimestamp(
boolean utcTimestamp,
long interval,
ChronoUnit unit) {
final Instant instant = Instant.EPOCH.plus(interval, unit);
if (utcTimestamp) {
return TimestampData.fromInstant(instant);
} else {
// this applies the local timezone
return TimestampData.fromTimestamp(Timestamp.from(instant));
}
}
}

View File

@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import java.io.IOException;
/**
* Map {@link ColumnReader}.
*/
public class MapColumnReader implements ColumnReader<WritableColumnVector> {
private final LogicalType logicalType;
private final ArrayColumnReader keyReader;
private final ArrayColumnReader valueReader;
public MapColumnReader(
ArrayColumnReader keyReader, ArrayColumnReader valueReader, LogicalType logicalType) {
this.keyReader = keyReader;
this.valueReader = valueReader;
this.logicalType = logicalType;
}
public void readBatch(int total, ColumnVector column) throws IOException {
HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) column;
MapType mapType = (MapType) logicalType;
// initialize 2 ListColumnVector for keys and values
HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total);
HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total);
// read the keys and values
keyReader.readToVector(total, keyArrayColumnVector);
valueReader.readToVector(total, valueArrayColumnVector);
// set the related attributes according to the keys and values
mapColumnVector.setKeys(keyArrayColumnVector.child);
mapColumnVector.setValues(valueArrayColumnVector.child);
mapColumnVector.setOffsets(keyArrayColumnVector.offsets);
mapColumnVector.setLengths(keyArrayColumnVector.lengths);
mapColumnVector.setSize(keyArrayColumnVector.getSize());
for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
if (keyArrayColumnVector.isNullAt(i)) {
mapColumnVector.setNullAt(i);
}
}
}
@Override
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
readBatch(readNumber, vector);
}
}

View File

@@ -0,0 +1,385 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.ColumnarRowData;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.IntStream;
import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
import static org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
* This reader is used to read a {@link VectorizedColumnBatch} from input split.
*
* <p>Note: Reference Flink release 1.11.2
* {@code org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader}
* because it is package scope.
*/
public class ParquetColumnarRowSplitReader implements Closeable {
private final boolean utcTimestamp;
private final MessageType fileSchema;
private final LogicalType[] requestedTypes;
private final MessageType requestedSchema;
/**
* The total number of rows this RecordReader will eventually read. The sum of the rows of all
* the row groups.
*/
private final long totalRowCount;
private final WritableColumnVector[] writableVectors;
private final VectorizedColumnBatch columnarBatch;
private final ColumnarRowData row;
private final int batchSize;
private ParquetFileReader reader;
/**
* For each request column, the reader to read this column. This is NULL if this column is
* missing from the file, in which case we populate the attribute with NULL.
*/
private ColumnReader[] columnReaders;
/**
* The number of rows that have been returned.
*/
private long rowsReturned;
/**
* The number of rows that have been reading, including the current in flight row group.
*/
private long totalCountLoadedSoFar;
// the index of the next row to return
private int nextRow;
// the number of rows in the current batch
private int rowsInBatch;
public ParquetColumnarRowSplitReader(
boolean utcTimestamp,
boolean caseSensitive,
Configuration conf,
LogicalType[] selectedTypes,
String[] selectedFieldNames,
ColumnBatchGenerator generator,
int batchSize,
Path path,
long splitStart,
long splitLength) throws IOException {
this.utcTimestamp = utcTimestamp;
this.batchSize = batchSize;
// then we need to apply the predicate push down filter
ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
MessageType fileSchema = footer.getFileMetaData().getSchema();
FilterCompat.Filter filter = getFilter(conf);
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
this.fileSchema = footer.getFileMetaData().getSchema();
Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray();
Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new);
this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new);
this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet");
this.reader = new ParquetFileReader(
conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
long totalRowCount = 0;
for (BlockMetaData block : blocks) {
totalRowCount += block.getRowCount();
}
this.totalRowCount = totalRowCount;
this.nextRow = 0;
this.rowsInBatch = 0;
this.rowsReturned = 0;
checkSchema();
this.writableVectors = createWritableVectors();
ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices);
this.columnarBatch = generator.generate(columnVectors);
this.row = new ColumnarRowData(columnarBatch);
}
/**
* Patches the given vectors with nulls.
* The vector position that is not requested (or read from file) is patched as null.
*
* @param fields The total selected fields number
* @param vectors The readable vectors
* @param indices The requested indices from the selected fields
*/
private static ColumnVector[] patchedVector(int fields, ColumnVector[] vectors, int[] indices) {
ColumnVector[] patched = new ColumnVector[fields];
for (int i = 0; i < indices.length; i++) {
patched[indices[i]] = vectors[i];
}
return patched;
}
/**
* Clips `parquetSchema` according to `fieldNames`.
*/
private static Type[] clipParquetSchema(
GroupType parquetSchema, String[] fieldNames, boolean caseSensitive) {
Type[] types = new Type[fieldNames.length];
if (caseSensitive) {
for (int i = 0; i < fieldNames.length; ++i) {
String fieldName = fieldNames[i];
types[i] = parquetSchema.containsField(fieldName) ? parquetSchema.getType(fieldName) : null;
}
} else {
Map<String, Type> caseInsensitiveFieldMap = new HashMap<>();
for (Type type : parquetSchema.getFields()) {
caseInsensitiveFieldMap.compute(type.getName().toLowerCase(Locale.ROOT),
(key, previousType) -> {
if (previousType != null) {
throw new FlinkRuntimeException(
"Parquet with case insensitive mode should have no duplicate key: " + key);
}
return type;
});
}
for (int i = 0; i < fieldNames.length; ++i) {
Type type = caseInsensitiveFieldMap.get(fieldNames[i].toLowerCase(Locale.ROOT));
// TODO clip for array,map,row types.
types[i] = type;
}
}
return types;
}
private WritableColumnVector[] createWritableVectors() {
WritableColumnVector[] columns = new WritableColumnVector[requestedTypes.length];
List<Type> types = requestedSchema.getFields();
List<ColumnDescriptor> descriptors = requestedSchema.getColumns();
for (int i = 0; i < requestedTypes.length; i++) {
columns[i] = createWritableColumnVector(
batchSize,
requestedTypes[i],
types.get(i),
descriptors);
}
return columns;
}
/**
* Create readable vectors from writable vectors.
* Especially for decimal, see {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}.
*/
private ColumnVector[] createReadableVectors() {
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
for (int i = 0; i < writableVectors.length; i++) {
vectors[i] = requestedTypes[i].getTypeRoot() == LogicalTypeRoot.DECIMAL
? new ParquetDecimalVector(writableVectors[i])
: writableVectors[i];
}
return vectors;
}
private void checkSchema() throws IOException, UnsupportedOperationException {
/*
* Check that the requested schema is supported.
*/
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
String[] colPath = requestedSchema.getPaths().get(i);
if (fileSchema.containsPath(colPath)) {
ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
if (!fd.equals(requestedSchema.getColumns().get(i))) {
throw new UnsupportedOperationException("Schema evolution not supported.");
}
} else {
if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
// Column is missing in data but the required data is non-nullable. This file is invalid.
throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath));
}
}
}
}
/**
* Method used to check if the end of the input is reached.
*
* @return True if the end is reached, otherwise false.
* @throws IOException Thrown, if an I/O error occurred.
*/
public boolean reachedEnd() throws IOException {
return !ensureBatch();
}
public ColumnarRowData nextRecord() {
// return the next row
row.setRowId(this.nextRow++);
return row;
}
/**
* Checks if there is at least one row left in the batch to return. If no more row are
* available, it reads another batch of rows.
*
* @return Returns true if there is one more row to return, false otherwise.
* @throws IOException throw if an exception happens while reading a batch.
*/
private boolean ensureBatch() throws IOException {
if (nextRow >= rowsInBatch) {
// No more rows available in the Rows array.
nextRow = 0;
// Try to read the next batch if rows from the file.
return nextBatch();
}
// there is at least one Row left in the Rows array.
return true;
}
/**
* Advances to the next batch of rows. Returns false if there are no more.
*/
private boolean nextBatch() throws IOException {
for (WritableColumnVector v : writableVectors) {
v.reset();
}
columnarBatch.setNumRows(0);
if (rowsReturned >= totalRowCount) {
return false;
}
if (rowsReturned == totalCountLoadedSoFar) {
readNextRowGroup();
}
int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
//noinspection unchecked
columnReaders[i].readToVector(num, writableVectors[i]);
}
rowsReturned += num;
columnarBatch.setNumRows(num);
rowsInBatch = num;
return true;
}
private void readNextRowGroup() throws IOException {
PageReadStore pages = reader.readNextRowGroup();
if (pages == null) {
throw new IOException("expecting more rows but reached last block. Read "
+ rowsReturned + " out of " + totalRowCount);
}
List<Type> types = requestedSchema.getFields();
List<ColumnDescriptor> columns = requestedSchema.getColumns();
columnReaders = new ColumnReader[types.size()];
for (int i = 0; i < types.size(); ++i) {
columnReaders[i] = createColumnReader(
utcTimestamp,
requestedTypes[i],
types.get(i),
columns,
pages);
}
totalCountLoadedSoFar += pages.getRowCount();
}
/**
* Seek to a particular row number.
*/
public void seekToRow(long rowCount) throws IOException {
if (totalCountLoadedSoFar != 0) {
throw new UnsupportedOperationException("Only support seek at first.");
}
List<BlockMetaData> blockMetaData = reader.getRowGroups();
for (BlockMetaData metaData : blockMetaData) {
if (metaData.getRowCount() > rowCount) {
break;
} else {
reader.skipNextRowGroup();
rowsReturned += metaData.getRowCount();
totalCountLoadedSoFar += metaData.getRowCount();
rowsInBatch = (int) metaData.getRowCount();
nextRow = (int) metaData.getRowCount();
rowCount -= metaData.getRowCount();
}
}
for (int i = 0; i < rowCount; i++) {
boolean end = reachedEnd();
if (end) {
throw new RuntimeException("Seek to many rows.");
}
nextRecord();
}
}
@Override
public void close() throws IOException {
if (reader != null) {
reader.close();
reader = null;
}
}
/**
* Interface to gen {@link VectorizedColumnBatch}.
*/
public interface ColumnBatchGenerator {
VectorizedColumnBatch generate(ColumnVector[] readVectors);
}
}

View File

@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.TimestampData;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.Dictionary;
import java.io.IOException;
/**
* The interface to wrap the underlying Parquet dictionary and non dictionary encoded page reader.
*/
public interface ParquetDataColumnReader {
/**
* Initialize the reader by page data.
*
* @param valueCount value count
* @param in page data
* @throws IOException
*/
void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException;
/**
* @return the next Dictionary ID from the page
*/
int readValueDictionaryId();
/**
* @return the next Long from the page
*/
long readLong();
/**
* @return the next Integer from the page
*/
int readInteger();
/**
* @return the next SmallInt from the page
*/
int readSmallInt();
/**
* @return the next TinyInt from the page
*/
int readTinyInt();
/**
* @return the next Float from the page
*/
float readFloat();
/**
* @return the next Boolean from the page
*/
boolean readBoolean();
/**
* @return the next String from the page
*/
byte[] readString();
/**
* @return the next Varchar from the page
*/
byte[] readVarchar();
/**
* @return the next Char from the page
*/
byte[] readChar();
/**
* @return the next Bytes from the page
*/
byte[] readBytes();
/**
* @return the next Decimal from the page
*/
byte[] readDecimal();
/**
* @return the next Double from the page
*/
double readDouble();
/**
* @return the next TimestampData from the page
*/
TimestampData readTimestamp();
/**
* @return is data valid
*/
boolean isValid();
/**
* @return the underlying dictionary if current reader is dictionary encoded
*/
Dictionary getDictionary();
/**
* @param id in dictionary
* @return the Bytes from the dictionary by id
*/
byte[] readBytes(int id);
/**
* @param id in dictionary
* @return the Float from the dictionary by id
*/
float readFloat(int id);
/**
* @param id in dictionary
* @return the Double from the dictionary by id
*/
double readDouble(int id);
/**
* @param id in dictionary
* @return the Integer from the dictionary by id
*/
int readInteger(int id);
/**
* @param id in dictionary
* @return the Long from the dictionary by id
*/
long readLong(int id);
/**
* @param id in dictionary
* @return the Small Int from the dictionary by id
*/
int readSmallInt(int id);
/**
* @param id in dictionary
* @return the tiny int from the dictionary by id
*/
int readTinyInt(int id);
/**
* @param id in dictionary
* @return the Boolean from the dictionary by id
*/
boolean readBoolean(int id);
/**
* @param id in dictionary
* @return the Decimal from the dictionary by id
*/
byte[] readDecimal(int id);
/**
* @param id in dictionary
* @return the TimestampData from the dictionary by id
*/
TimestampData readTimestamp(int id);
/**
* @param id in dictionary
* @return the String from the dictionary by id
*/
byte[] readString(int id);
/**
* @param id in dictionary
* @return the Varchar from the dictionary by id
*/
byte[] readVarchar(int id);
/**
* @param id in dictionary
* @return the Char from the dictionary by id
*/
byte[] readChar(int id);
}

View File

@@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.TimestampData;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Timestamp;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND;
/**
* Parquet file has self-describing schema which may differ from the user required schema (e.g.
* schema evolution). This factory is used to retrieve user required typed data via corresponding
* reader which reads the underlying data.
*/
public final class ParquetDataColumnReaderFactory {
private ParquetDataColumnReaderFactory() {
}
/**
* default reader for {@link ParquetDataColumnReader}.
*/
public static class DefaultParquetDataColumnReader implements ParquetDataColumnReader {
protected ValuesReader valuesReader;
protected Dictionary dict;
// After the data is read in the parquet type, isValid will be set to true if the data can
// be returned in the type defined in HMS. Otherwise isValid is set to false.
boolean isValid = true;
public DefaultParquetDataColumnReader(ValuesReader valuesReader) {
this.valuesReader = valuesReader;
}
public DefaultParquetDataColumnReader(Dictionary dict) {
this.dict = dict;
}
@Override
public void initFromPage(int i, ByteBufferInputStream in) throws IOException {
valuesReader.initFromPage(i, in);
}
@Override
public boolean readBoolean() {
return valuesReader.readBoolean();
}
@Override
public boolean readBoolean(int id) {
return dict.decodeToBoolean(id);
}
@Override
public byte[] readString(int id) {
return dict.decodeToBinary(id).getBytesUnsafe();
}
@Override
public byte[] readString() {
return valuesReader.readBytes().getBytesUnsafe();
}
@Override
public byte[] readVarchar() {
// we need to enforce the size here even the types are the same
return valuesReader.readBytes().getBytesUnsafe();
}
@Override
public byte[] readVarchar(int id) {
return dict.decodeToBinary(id).getBytesUnsafe();
}
@Override
public byte[] readChar() {
return valuesReader.readBytes().getBytesUnsafe();
}
@Override
public byte[] readChar(int id) {
return dict.decodeToBinary(id).getBytesUnsafe();
}
@Override
public byte[] readBytes() {
return valuesReader.readBytes().getBytesUnsafe();
}
@Override
public byte[] readBytes(int id) {
return dict.decodeToBinary(id).getBytesUnsafe();
}
@Override
public byte[] readDecimal() {
return valuesReader.readBytes().getBytesUnsafe();
}
@Override
public byte[] readDecimal(int id) {
return dict.decodeToBinary(id).getBytesUnsafe();
}
@Override
public float readFloat() {
return valuesReader.readFloat();
}
@Override
public float readFloat(int id) {
return dict.decodeToFloat(id);
}
@Override
public double readDouble() {
return valuesReader.readDouble();
}
@Override
public double readDouble(int id) {
return dict.decodeToDouble(id);
}
@Override
public TimestampData readTimestamp() {
throw new RuntimeException("Unsupported operation");
}
@Override
public TimestampData readTimestamp(int id) {
throw new RuntimeException("Unsupported operation");
}
@Override
public int readInteger() {
return valuesReader.readInteger();
}
@Override
public int readInteger(int id) {
return dict.decodeToInt(id);
}
@Override
public boolean isValid() {
return isValid;
}
@Override
public long readLong(int id) {
return dict.decodeToLong(id);
}
@Override
public long readLong() {
return valuesReader.readLong();
}
@Override
public int readSmallInt() {
return valuesReader.readInteger();
}
@Override
public int readSmallInt(int id) {
return dict.decodeToInt(id);
}
@Override
public int readTinyInt() {
return valuesReader.readInteger();
}
@Override
public int readTinyInt(int id) {
return dict.decodeToInt(id);
}
@Override
public int readValueDictionaryId() {
return valuesReader.readValueDictionaryId();
}
public void skip() {
valuesReader.skip();
}
@Override
public Dictionary getDictionary() {
return dict;
}
}
/**
* The reader who reads from the underlying Timestamp value value.
*/
public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader {
private final boolean isUtcTimestamp;
public TypesFromInt96PageReader(ValuesReader realReader, boolean isUtcTimestamp) {
super(realReader);
this.isUtcTimestamp = isUtcTimestamp;
}
public TypesFromInt96PageReader(Dictionary dict, boolean isUtcTimestamp) {
super(dict);
this.isUtcTimestamp = isUtcTimestamp;
}
private TimestampData convert(Binary binary) {
ByteBuffer buf = binary.toByteBuffer();
buf.order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = buf.getLong();
int julianDay = buf.getInt();
return int96ToTimestamp(isUtcTimestamp, timeOfDayNanos, julianDay);
}
@Override
public TimestampData readTimestamp(int id) {
return convert(dict.decodeToBinary(id));
}
@Override
public TimestampData readTimestamp() {
return convert(valuesReader.readBytes());
}
}
private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
boolean isDictionary,
PrimitiveType parquetType,
Dictionary dictionary,
ValuesReader valuesReader,
boolean isUtcTimestamp) {
if (parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return isDictionary
? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
: new TypesFromInt96PageReader(valuesReader, isUtcTimestamp);
} else {
return isDictionary
? new DefaultParquetDataColumnReader(dictionary)
: new DefaultParquetDataColumnReader(valuesReader);
}
}
public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary(
PrimitiveType parquetType, Dictionary realReader, boolean isUtcTimestamp) {
return getDataColumnReaderByTypeHelper(true, parquetType, realReader, null, isUtcTimestamp);
}
public static ParquetDataColumnReader getDataColumnReaderByType(
PrimitiveType parquetType, ValuesReader realReader, boolean isUtcTimestamp) {
return getDataColumnReaderByTypeHelper(
false, parquetType, null, realReader, isUtcTimestamp);
}
private static TimestampData int96ToTimestamp(
boolean utcTimestamp, long nanosOfDay, int julianDay) {
long millisecond = julianDayToMillis(julianDay) + (nanosOfDay / NANOS_PER_MILLISECOND);
if (utcTimestamp) {
int nanoOfMillisecond = (int) (nanosOfDay % NANOS_PER_MILLISECOND);
return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond);
} else {
Timestamp timestamp = new Timestamp(millisecond);
timestamp.setNanos((int) (nanosOfDay % NANOS_PER_SECOND));
return TimestampData.fromTimestamp(timestamp);
}
}
private static long julianDayToMillis(int julianDay) {
return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import java.io.IOException;
import java.util.List;
/**
* Row {@link ColumnReader}.
*/
public class RowColumnReader implements ColumnReader<WritableColumnVector> {
private final List<ColumnReader> fieldReaders;
public RowColumnReader(List<ColumnReader> fieldReaders) {
this.fieldReaders = fieldReaders;
}
@Override
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);
for (int j = 0; j < readNumber; j++) {
boolean isNull = (i == 0)
? vectors[i].isNullAt(j)
: rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
if (isNull) {
rowColumnVector.setNullAt(j);
}
}
}
}
}

View File

@@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.format.cow.vector.reader;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.data.vector.writable.WritableIntVector;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
import org.apache.parquet.io.ParquetDecodingException;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Run length decoder for data and dictionary ids.
* See https://github.com/apache/parquet-format/blob/master/Encodings.md
* See {@link RunLengthBitPackingHybridDecoder}.
*
* <p>Note: Reference Flink release 1.11.2
* {@code org.apache.flink.formats.parquet.vector.reader.RunLengthDecoder}
* because it is package scope.
*/
final class RunLengthDecoder {
/**
* If true, the bit width is fixed. This decoder is used in different places and this also
* controls if we need to read the bitwidth from the beginning of the data stream.
*/
private final boolean fixedWidth;
private final boolean readLength;
// Encoded data.
private ByteBufferInputStream in;
// bit/byte width of decoded data and utility to batch unpack them.
private int bitWidth;
private int bytesWidth;
private BytePacker packer;
// Current decoding mode and values
MODE mode;
int currentCount;
int currentValue;
// Buffer of decoded values if the values are PACKED.
int[] currentBuffer = new int[16];
int currentBufferIdx = 0;
RunLengthDecoder() {
this.fixedWidth = false;
this.readLength = false;
}
RunLengthDecoder(int bitWidth) {
this.fixedWidth = true;
this.readLength = bitWidth != 0;
initWidthAndPacker(bitWidth);
}
RunLengthDecoder(int bitWidth, boolean readLength) {
this.fixedWidth = true;
this.readLength = readLength;
initWidthAndPacker(bitWidth);
}
/**
* Init from input stream.
*/
void initFromStream(int valueCount, ByteBufferInputStream in) throws IOException {
this.in = in;
if (fixedWidth) {
// initialize for repetition and definition levels
if (readLength) {
int length = readIntLittleEndian();
this.in = in.sliceStream(length);
}
} else {
// initialize for values
if (in.available() > 0) {
initWidthAndPacker(in.read());
}
}
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
this.mode = MODE.RLE;
this.currentCount = valueCount;
this.currentValue = 0;
} else {
this.currentCount = 0;
}
}
/**
* Initializes the internal state for decoding ints of `bitWidth`.
*/
private void initWidthAndPacker(int bitWidth) {
Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
this.bitWidth = bitWidth;
this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
}
int readInteger() {
if (this.currentCount == 0) {
this.readNextGroup();
}
this.currentCount--;
switch (mode) {
case RLE:
return this.currentValue;
case PACKED:
return this.currentBuffer[currentBufferIdx++];
default:
throw new AssertionError();
}
}
/**
* Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
* populated into `nulls`.
*/
void readDictionaryIds(
int total,
WritableIntVector values,
WritableColumnVector nulls,
int rowId,
int level,
RunLengthDecoder data) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
data.readDictionaryIdData(n, values, rowId);
} else {
nulls.setNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
values.setInt(rowId + i, data.readInteger());
} else {
nulls.setNullAt(rowId + i);
}
}
break;
default:
throw new AssertionError();
}
rowId += n;
left -= n;
currentCount -= n;
}
}
/**
* It is used to decode dictionary IDs.
*/
private void readDictionaryIdData(int total, WritableIntVector c, int rowId) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
c.setInts(rowId, n, currentValue);
break;
case PACKED:
c.setInts(rowId, n, currentBuffer, currentBufferIdx);
currentBufferIdx += n;
break;
default:
throw new AssertionError();
}
rowId += n;
left -= n;
currentCount -= n;
}
}
/**
* Reads the next varint encoded int.
*/
private int readUnsignedVarInt() throws IOException {
int value = 0;
int shift = 0;
int b;
do {
b = in.read();
value |= (b & 0x7F) << shift;
shift += 7;
} while ((b & 0x80) != 0);
return value;
}
/**
* Reads the next 4 byte little endian int.
*/
private int readIntLittleEndian() throws IOException {
int ch4 = in.read();
int ch3 = in.read();
int ch2 = in.read();
int ch1 = in.read();
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
}
/**
* Reads the next byteWidth little endian int.
*/
private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
switch (bytesWidth) {
case 0:
return 0;
case 1:
return in.read();
case 2: {
int ch2 = in.read();
int ch1 = in.read();
return (ch1 << 8) + ch2;
}
case 3: {
int ch3 = in.read();
int ch2 = in.read();
int ch1 = in.read();
return (ch1 << 16) + (ch2 << 8) + ch3;
}
case 4: {
return readIntLittleEndian();
}
default:
throw new RuntimeException("Unreachable");
}
}
/**
* Reads the next group.
*/
void readNextGroup() {
try {
int header = readUnsignedVarInt();
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
case RLE:
this.currentCount = header >>> 1;
this.currentValue = readIntLittleEndianPaddedOnBitWidth();
return;
case PACKED:
int numGroups = header >>> 1;
this.currentCount = numGroups * 8;
if (this.currentBuffer.length < this.currentCount) {
this.currentBuffer = new int[this.currentCount];
}
currentBufferIdx = 0;
int valueIndex = 0;
while (valueIndex < this.currentCount) {
// values are bit packed 8 at a time, so reading bitWidth will always work
ByteBuffer buffer = in.slice(bitWidth);
this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
valueIndex += 8;
}
return;
default:
throw new ParquetDecodingException("not a valid mode " + this.mode);
}
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read from input stream", e);
}
}
enum MODE {
RLE,
PACKED
}
}