1
0

[HUDI-4119] the first read result is incorrect when Flink upsert- Kafka connector is used in HUDi (#5626)

* HUDI-4119 the first read result is incorrect when Flink upsert- Kafka connector is used in HUDi

Co-authored-by: aliceyyan <aliceyyan@tencent.com>
This commit is contained in:
aliceyyan
2022-05-20 18:10:24 +08:00
committed by GitHub
parent 6f37863ba8
commit 1da0b21edd

View File

@@ -20,12 +20,16 @@ package org.apache.hudi.table;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
@@ -381,8 +385,8 @@ public class HoodieTableSource implements
}
private InputFormat<RowData, ?> getStreamInputFormat() {
// if table does not exist, use schema from the DDL
Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() : getTableAvroSchema();
// if table does not exist or table data does not exist, use schema from the DDL
Schema tableAvroSchema = (this.metaClient == null || !tableDataExists()) ? inferSchemaFromDdl() : getTableAvroSchema();
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
@@ -399,6 +403,15 @@ public class HoodieTableSource implements
throw new HoodieException(errMsg);
}
/**
* Returns whether the hoodie table data exists .
*/
private boolean tableDataExists() {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = activeTimeline.getLastCommitMetadataWithValidData();
return instantAndCommitMetadata.isPresent();
}
private MergeOnReadInputFormat mergeOnReadInputFormat(
RowType rowType,
RowType requiredRowType,