From d6d52c60636ae6a0c16469fa6761d0080fddf72f Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 20 Apr 2021 21:27:09 +0800 Subject: [PATCH] [HUDI-1809] Flink merge on read input split uses wrong base file path for default merge type (#2846) --- .../format/mor/MergeOnReadInputFormat.java | 38 ++++++++---- .../apache/hudi/util/RowDataProjection.java | 61 +++++++++++++++++++ .../hudi/table/format/TestInputFormat.java | 1 + 3 files changed, 88 insertions(+), 12 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 12bebdf4d..1186cff8a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -29,6 +29,7 @@ import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader; import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; import org.apache.hudi.util.AvroToRowDataConverters; +import org.apache.hudi.util.RowDataProjection; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StringToRowDataConverter; @@ -63,6 +64,7 @@ import java.util.stream.IntStream; import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema; @@ -180,7 +182,7 @@ public class MergeOnReadInputFormat new Schema.Parser().parse(this.tableState.getAvroSchema()), new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()), this.requiredPos, - getFullSchemaReader(split.getTablePath())); + getFullSchemaReader(split.getBasePath().get())); } else { throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for " + "file path: " + split.getBasePath() @@ -337,7 +339,7 @@ public class MergeOnReadInputFormat // efficient. if (split.getInstantRange().isPresent()) { // based on the fact that commit time is always the first field - String commitTime = curAvroRecord.get().get(0).toString(); + String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString(); if (!split.getInstantRange().get().isInRange(commitTime)) { // filter out the records that are not in range return hasNext(); @@ -431,6 +433,11 @@ public class MergeOnReadInputFormat // iterator for log files private final Iterator iterator; + // add the flag because the flink ParquetColumnarRowSplitReader is buggy: + // method #reachedEnd() returns false after it returns true. + // refactor it out once FLINK-22370 is resolved. + private boolean readLogs = false; + private RowData currentRecord; SkipMergeIterator(ParquetColumnarRowSplitReader reader, Iterator iterator) { @@ -440,10 +447,11 @@ public class MergeOnReadInputFormat @Override public boolean reachedEnd() throws IOException { - if (!this.reader.reachedEnd()) { + if (!readLogs && !this.reader.reachedEnd()) { currentRecord = this.reader.nextRecord(); return false; } + readLogs = true; if (this.iterator.hasNext()) { currentRecord = this.iterator.next(); return false; @@ -479,6 +487,12 @@ public class MergeOnReadInputFormat private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; private final GenericRecordBuilder recordBuilder; + private final RowDataProjection projection; + // add the flag because the flink ParquetColumnarRowSplitReader is buggy: + // method #reachedEnd() returns false after it returns true. + // refactor it out once FLINK-22370 is resolved. + private boolean readLogs = false; + private Set keyToSkip = new HashSet<>(); private RowData currentRecord; @@ -501,11 +515,12 @@ public class MergeOnReadInputFormat this.recordBuilder = new GenericRecordBuilder(requiredSchema); this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); + this.projection = RowDataProjection.instance(requiredRowType, requiredPos); } @Override public boolean reachedEnd() throws IOException { - if (!this.reader.reachedEnd()) { + if (!readLogs && !this.reader.reachedEnd()) { currentRecord = this.reader.nextRecord(); final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); if (logRecords.containsKey(curKey)) { @@ -524,19 +539,18 @@ public class MergeOnReadInputFormat return false; } } + // project the full record in base with required positions + currentRecord = projection.project(currentRecord); return false; } else { - if (logKeysIterator.hasNext()) { + readLogs = true; + while (logKeysIterator.hasNext()) { final String curKey = logKeysIterator.next(); - if (keyToSkip.contains(curKey)) { - return reachedEnd(); - } else { + if (!keyToSkip.contains(curKey)) { Option insertAvroRecord = logRecords.get(curKey).getData().getInsertValue(tableSchema); - if (!insertAvroRecord.isPresent()) { - // stand alone delete record, skipping - return reachedEnd(); - } else { + if (insertAvroRecord.isPresent()) { + // the record is a DELETE if insertAvroRecord not present, skipping GenericRecord requiredAvroRecord = buildAvroRecordBySchema( insertAvroRecord.get(), requiredSchema, diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java new file mode 100644 index 000000000..67bb8cafa --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +/** + * Utilities to project the row data with given positions. + */ +public class RowDataProjection { + private final RowData.FieldGetter[] fieldGetters; + + private RowDataProjection(LogicalType[] types, int[] positions) { + ValidationUtils.checkArgument(types.length == positions.length, + "types and positions should have the equal number"); + this.fieldGetters = new RowData.FieldGetter[types.length]; + for (int i = 0; i < types.length; i++) { + final LogicalType type = types[i]; + final int pos = positions[i]; + this.fieldGetters[i] = RowData.createFieldGetter(type, pos); + } + } + + public static RowDataProjection instance(RowType rowType, int[] positions) { + final LogicalType[] types = rowType.getChildren().toArray(new LogicalType[0]); + return new RowDataProjection(types, positions); + } + + /** + * Returns the projected row data. + */ + public RowData project(RowData rowData) { + GenericRowData genericRowData = new GenericRowData(this.fieldGetters.length); + for (int i = 0; i < this.fieldGetters.length; i++) { + final Object val = this.fieldGetters[i].getFieldOrNull(rowData); + genericRowData.setField(i, val); + } + return genericRowData; + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 5e324de67..fcfa7cf5f 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -118,6 +118,7 @@ public class TestInputFormat { // write parquet first with compaction conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); TestData.writeData(TestData.DATA_SET_INSERT, conf); InputFormat inputFormat = this.tableSource.getInputFormat();