diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 06305e2dc..0fac2cc49 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -42,8 +43,8 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.schema.FilebasedSchemaProvider; -import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.transform.ChainedTransformer; +import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; @@ -168,6 +169,7 @@ public class StreamerUtil { // actually Flink cleaning is always with parallelism 1 now .withCleanerParallelism(20) .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) .build()) .withMemoryConfig( HoodieMemoryConfig.newBuilder() @@ -372,7 +374,7 @@ public class StreamerUtil { return fileStatus.getLen() > 0; } - + public static boolean allowDuplicateInserts(Configuration conf) { WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);