From cc81ddde01d6c743708e68c7fb6394facca1e672 Mon Sep 17 00:00:00 2001 From: hiscat <46845236+MyLanPangzi@users.noreply.github.com> Date: Wed, 21 Apr 2021 20:13:30 +0800 Subject: [PATCH] [HUDI-1812] Add explicit index state TTL option for Flink writer (#2853) --- .../java/org/apache/hudi/configuration/FlinkOptions.java | 6 ++++++ .../apache/hudi/sink/partitioner/BucketAssignFunction.java | 7 ++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 9925fc7bb..3a942affb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -80,6 +80,12 @@ public class FlinkOptions { .defaultValue(false) .withDescription("Whether to bootstrap the index state from existing hoodie table, default false"); + public static final ConfigOption INDEX_STATE_TTL = ConfigOptions + .key("index.state.ttl") + .doubleType() + .defaultValue(1.5D) + .withDescription("Index state ttl in days, default 1.5 day"); + // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index f765e9d5a..79a3f44c9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -37,17 +37,18 @@ import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.runtime.util.StateTtlConfigUtil; import org.apache.flink.util.Collector; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -145,6 +146,10 @@ public class BucketAssignFunction> "indexState", TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class)); + double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000; + if (ttl > 0) { + indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl)); + } indexState = context.getKeyedStateStore().getMapState(indexStateDesc); if (bootstrapIndex) { MapStateDescriptor partitionLoadStateDesc =