diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 3d386cf8c..e08fbe314 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -127,10 +127,14 @@ public class CompactionUtil { * @param conf The configuration */ public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception { - TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); - Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile(); - if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) { - conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true); + try { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile(); + if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) { + conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true); + } + } catch (Exception e) { + LOG.warn("Could not get schema from data file, CHANGELOG_ENABLE is set to default", e); } }