[HUDI-1812] Add explicit index state TTL option for Flink writer (#2853)
This commit is contained in:
@@ -80,6 +80,12 @@ public class FlinkOptions {
|
||||
.defaultValue(false)
|
||||
.withDescription("Whether to bootstrap the index state from existing hoodie table, default false");
|
||||
|
||||
public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions
|
||||
.key("index.state.ttl")
|
||||
.doubleType()
|
||||
.defaultValue(1.5D)
|
||||
.withDescription("Index state ttl in days, default 1.5 day");
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// Read Options
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
@@ -37,17 +37,18 @@ import org.apache.hudi.table.action.commit.BucketInfo;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.api.common.state.CheckpointListener;
|
||||
import org.apache.flink.api.common.state.MapState;
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.common.typeinfo.Types;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.state.CheckpointListener;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.slf4j.Logger;
|
||||
@@ -145,6 +146,10 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
"indexState",
|
||||
TypeInformation.of(HoodieKey.class),
|
||||
TypeInformation.of(HoodieRecordLocation.class));
|
||||
double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
|
||||
if (ttl > 0) {
|
||||
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
|
||||
}
|
||||
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
|
||||
if (bootstrapIndex) {
|
||||
MapStateDescriptor<String, Integer> partitionLoadStateDesc =
|
||||
|
||||
Reference in New Issue
Block a user