From 1da0b21edd6693e9025e45889cc7b1c37658a4e1 Mon Sep 17 00:00:00 2001 From: aliceyyan <104287562+aliceyyan@users.noreply.github.com> Date: Fri, 20 May 2022 18:10:24 +0800 Subject: [PATCH] [HUDI-4119] the first read result is incorrect when Flink upsert- Kafka connector is used in HUDi (#5626) * HUDI-4119 the first read result is incorrect when Flink upsert- Kafka connector is used in HUDi Co-authored-by: aliceyyan --- .../apache/hudi/table/HoodieTableSource.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index bad592aa2..183685738 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -20,12 +20,16 @@ package org.apache.hudi.table; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; @@ -381,8 +385,8 @@ public class HoodieTableSource implements } private InputFormat getStreamInputFormat() { - // if table does not exist, use schema from the DDL - Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() : getTableAvroSchema(); + // if table does not exist or table data does not exist, use schema from the DDL + Schema tableAvroSchema = (this.metaClient == null || !tableDataExists()) ? inferSchemaFromDdl() : getTableAvroSchema(); final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); final RowType rowType = (RowType) rowDataType.getLogicalType(); final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); @@ -399,6 +403,15 @@ public class HoodieTableSource implements throw new HoodieException(errMsg); } + /** + * Returns whether the hoodie table data exists . + */ + private boolean tableDataExists() { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + Option> instantAndCommitMetadata = activeTimeline.getLastCommitMetadataWithValidData(); + return instantAndCommitMetadata.isPresent(); + } + private MergeOnReadInputFormat mergeOnReadInputFormat( RowType rowType, RowType requiredRowType,