1
0

[HUDI-1809] Flink merge on read input split uses wrong base file path for default merge type (#2846)

This commit is contained in:
Danny Chan
2021-04-20 21:27:09 +08:00
committed by GitHub
parent 9a288ccbeb
commit d6d52c6063
3 changed files with 88 additions and 12 deletions

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;
@@ -63,6 +64,7 @@ import java.util.stream.IntStream;
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
@@ -180,7 +182,7 @@ public class MergeOnReadInputFormat
new Schema.Parser().parse(this.tableState.getAvroSchema()),
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
getFullSchemaReader(split.getTablePath()));
getFullSchemaReader(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
+ "file path: " + split.getBasePath()
@@ -337,7 +339,7 @@ public class MergeOnReadInputFormat
// efficient.
if (split.getInstantRange().isPresent()) {
// based on the fact that commit time is always the first field
String commitTime = curAvroRecord.get().get(0).toString();
String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString();
if (!split.getInstantRange().get().isInRange(commitTime)) {
// filter out the records that are not in range
return hasNext();
@@ -431,6 +433,11 @@ public class MergeOnReadInputFormat
// iterator for log files
private final Iterator<RowData> iterator;
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
// method #reachedEnd() returns false after it returns true.
// refactor it out once FLINK-22370 is resolved.
private boolean readLogs = false;
private RowData currentRecord;
SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator<RowData> iterator) {
@@ -440,10 +447,11 @@ public class MergeOnReadInputFormat
@Override
public boolean reachedEnd() throws IOException {
if (!this.reader.reachedEnd()) {
if (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord();
return false;
}
readLogs = true;
if (this.iterator.hasNext()) {
currentRecord = this.iterator.next();
return false;
@@ -479,6 +487,12 @@ public class MergeOnReadInputFormat
private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
private final GenericRecordBuilder recordBuilder;
private final RowDataProjection projection;
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
// method #reachedEnd() returns false after it returns true.
// refactor it out once FLINK-22370 is resolved.
private boolean readLogs = false;
private Set<String> keyToSkip = new HashSet<>();
private RowData currentRecord;
@@ -501,11 +515,12 @@ public class MergeOnReadInputFormat
this.recordBuilder = new GenericRecordBuilder(requiredSchema);
this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
this.projection = RowDataProjection.instance(requiredRowType, requiredPos);
}
@Override
public boolean reachedEnd() throws IOException {
if (!this.reader.reachedEnd()) {
if (!readLogs && !this.reader.reachedEnd()) {
currentRecord = this.reader.nextRecord();
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
if (logRecords.containsKey(curKey)) {
@@ -524,19 +539,18 @@ public class MergeOnReadInputFormat
return false;
}
}
// project the full record in base with required positions
currentRecord = projection.project(currentRecord);
return false;
} else {
if (logKeysIterator.hasNext()) {
readLogs = true;
while (logKeysIterator.hasNext()) {
final String curKey = logKeysIterator.next();
if (keyToSkip.contains(curKey)) {
return reachedEnd();
} else {
if (!keyToSkip.contains(curKey)) {
Option<IndexedRecord> insertAvroRecord =
logRecords.get(curKey).getData().getInsertValue(tableSchema);
if (!insertAvroRecord.isPresent()) {
// stand alone delete record, skipping
return reachedEnd();
} else {
if (insertAvroRecord.isPresent()) {
// the record is a DELETE if insertAvroRecord not present, skipping
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredSchema,

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
/**
* Utilities to project the row data with given positions.
*/
public class RowDataProjection {
private final RowData.FieldGetter[] fieldGetters;
private RowDataProjection(LogicalType[] types, int[] positions) {
ValidationUtils.checkArgument(types.length == positions.length,
"types and positions should have the equal number");
this.fieldGetters = new RowData.FieldGetter[types.length];
for (int i = 0; i < types.length; i++) {
final LogicalType type = types[i];
final int pos = positions[i];
this.fieldGetters[i] = RowData.createFieldGetter(type, pos);
}
}
public static RowDataProjection instance(RowType rowType, int[] positions) {
final LogicalType[] types = rowType.getChildren().toArray(new LogicalType[0]);
return new RowDataProjection(types, positions);
}
/**
* Returns the projected row data.
*/
public RowData project(RowData rowData) {
GenericRowData genericRowData = new GenericRowData(this.fieldGetters.length);
for (int i = 0; i < this.fieldGetters.length; i++) {
final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
genericRowData.setField(i, val);
}
return genericRowData;
}
}

View File

@@ -118,6 +118,7 @@ public class TestInputFormat {
// write parquet first with compaction
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();